This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/incubator-doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ffc9b8  [improvement] json load by line (#23)
2ffc9b8 is described below

commit 2ffc9b8a37ffbd05d33334df434001d18d5b177a
Author: madong <[email protected]>
AuthorDate: Fri Apr 8 09:29:40 2022 +0800

    [improvement] json load by line (#23)
---
 .../java/org/apache/doris/flink/sink/DorisSink.java     |  4 +++-
 .../apache/doris/flink/sink/writer/DorisStreamLoad.java | 17 ++---------------
 .../apache/doris/flink/table/DorisDynamicTableSink.java |  3 +--
 .../java/org/apache/doris/flink/DorisSinkExample.java   |  2 ++
 .../org/apache/doris/flink/DorisSinkSQLExample.java     |  4 +++-
 .../org/apache/doris/flink/DorisSourceSinkExample.java  |  4 ++--
 .../doris/flink/sink/writer/TestDorisStreamLoad.java    |  2 +-
 7 files changed, 14 insertions(+), 22 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
index 0c0da3f..efb530e 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
@@ -126,10 +126,12 @@ public class DorisSink<IN> implements Sink<IN, 
DorisCommittable, DorisWriterStat
 
         public DorisSink<IN> build() {
             Preconditions.checkNotNull(dorisOptions);
-            Preconditions.checkNotNull(dorisReadOptions);
             Preconditions.checkNotNull(dorisExecutionOptions);
             Preconditions.checkNotNull(serializer);
             
EscapeHandler.handleEscape(dorisExecutionOptions.getStreamLoadProp());
+            if(dorisReadOptions == null) {
+                dorisReadOptions = DorisReadOptions.builder().build();
+            }
             return new DorisSink<>(dorisOptions, dorisReadOptions, 
dorisExecutionOptions, serializer);
         }
     }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
index be9573f..7849559 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
@@ -58,6 +58,7 @@ import static 
org.apache.doris.flink.sink.LoadStatus.LABEL_ALREADY_EXIST;
 import static org.apache.doris.flink.sink.writer.LoadConstants.CSV;
 import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY;
 import static org.apache.doris.flink.sink.writer.LoadConstants.JSON;
+import static 
org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_DEFAULT;
 import static 
org.apache.doris.flink.sink.writer.LoadConstants.LINE_DELIMITER_KEY;
 
 /**
@@ -70,8 +71,6 @@ public class DorisStreamLoad implements Serializable {
     private final byte[] lineDelimiter;
     private static final String LOAD_URL_PATTERN = 
"http://%s/api/%s/%s/_stream_load";;
     private static final String ABORT_URL_PATTERN = 
"http://%s/api/%s/_stream_load_2pc";;
-    private static final byte[] JSON_ARRAY_START = 
"[".getBytes(StandardCharsets.UTF_8);
-    private static final byte[] JSON_ARRAY_END = 
"]".getBytes(StandardCharsets.UTF_8);
     private static final String JOB_EXIST_FINISHED = "FINISHED";
 
     private String loadUrlStr;
@@ -86,7 +85,6 @@ public class DorisStreamLoad implements Serializable {
     private Future<CloseableHttpResponse> pendingLoadFuture;
     private final CloseableHttpClient httpClient;
     private final ExecutorService executorService;
-    private final String format;
     private boolean loadBatchFirstRecord;
 
     public DorisStreamLoad(String hostPort,
@@ -109,12 +107,7 @@ public class DorisStreamLoad implements Serializable {
                 0L, TimeUnit.MILLISECONDS,
                 new LinkedBlockingQueue<>(), new 
ExecutorThreadFactory("stream-load-upload"));
         this.recordStream = new RecordStream(executionOptions.getBufferSize(), 
executionOptions.getBufferCount());
-        this.format = 
executionOptions.getStreamLoadProp().getProperty(FORMAT_KEY, CSV);
-        if (JSON.equals(format)) {
-            lineDelimiter = ",".getBytes();
-        } else {
-            lineDelimiter = streamLoadProp.getProperty(LINE_DELIMITER_KEY, 
"\n").getBytes();
-        }
+        lineDelimiter = streamLoadProp.getProperty(LINE_DELIMITER_KEY, 
LINE_DELIMITER_DEFAULT).getBytes();
         loadBatchFirstRecord = true;
     }
 
@@ -219,9 +212,6 @@ public class DorisStreamLoad implements Serializable {
     }
 
     public RespContent stopLoad() throws IOException{
-        if (JSON.equals(format)) {
-            recordStream.write(JSON_ARRAY_END);
-        }
         recordStream.endInput();
         LOG.info("stream load stopped.");
         Preconditions.checkState(pendingLoadFuture != null);
@@ -261,9 +251,6 @@ public class DorisStreamLoad implements Serializable {
             LOG.warn(err, e);
             throw e;
         }
-        if (JSON.equals(format)) {
-            recordStream.write(JSON_ARRAY_START);
-        }
     }
 
     private void abortTransaction(long txnID) throws Exception {
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
index 81e42ad..53d727f 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
@@ -77,8 +77,7 @@ public class DorisDynamicTableSink implements 
DynamicTableSink {
     @Override
     public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
         Properties loadProperties = executionOptions.getStreamLoadProp();
-        boolean deletable = RestService.isUniqueKeyType(options, readOptions, 
LOG) && executionOptions.getDeletable();
-
+        boolean deletable = RestService.isUniqueKeyType(options, readOptions, 
LOG) || executionOptions.getDeletable();
         if (!loadProperties.containsKey(COLUMNS_KEY)) {
             String[] fieldNames = tableSchema.getFieldNames();
             Preconditions.checkState(fieldNames != null && fieldNames.length > 
0);
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkExample.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkExample.java
index e71b85b..9c459b7 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkExample.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkExample.java
@@ -21,6 +21,7 @@ import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
 import org.apache.doris.flink.sink.DorisSink;
 import org.apache.doris.flink.sink.writer.SimpleStringSerializer;
+import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.time.Time;
@@ -40,6 +41,7 @@ public class DorisSinkExample {
     public static void main(String[] args) throws Exception{
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
         env.enableCheckpointing(10000);
         
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
         env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, 
Time.milliseconds(30000)));
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkSQLExample.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkSQLExample.java
index 8965e97..87da571 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkSQLExample.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkSQLExample.java
@@ -16,6 +16,7 @@
 // under the License.
 package org.apache.doris.flink;
 
+import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -31,6 +32,7 @@ public class DorisSinkSQLExample {
     public static void main(String[] args) {
         final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(1);
+        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
         final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
 
         List<Tuple2<String, Integer>> data = new ArrayList<>();
@@ -53,7 +55,7 @@ public class DorisSinkSQLExample {
                         "  'sink.buffer-count' = '4',\n" +
                         "  'sink.buffer-size' = '4086'," +
                         "  'sink.label-prefix' = 'doris_label',\n" +
-                        "  'sink.properties.strip_outer_array' = 'true'\n" +
+                        "  'sink.properties.read_json_by_line' = 'true'\n" +
                         ")");
         tEnv.executeSql("INSERT INTO doris_test_sink select name,age from 
doris_test");
     }
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceSinkExample.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceSinkExample.java
index c4ce1a5..60524c8 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceSinkExample.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceSinkExample.java
@@ -55,8 +55,8 @@ public class DorisSourceSinkExample {
                         "  'table.identifier' = 'db.table',\n" +
                         "  'username' = 'root',\n" +
                         "  'password' = '',\n" +
-                        "  'sink.batch.size' = '3',\n" +
-                        "  'sink.max-retries' = '2'\n" +
+                        "  'sink.properties.format' = 'csv',\n" +
+                        "  'sink.label-prefix' = 'doris_csv_table'\n" +
                         ")");
 
         tEnv.executeSql("INSERT INTO doris_test_sink select 
name,age,price,sale from doris_test");
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
index d1c6276..b401a5b 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisStreamLoad.java
@@ -106,7 +106,7 @@ public class TestDorisStreamLoad {
         properties.setProperty("line_delimiter", "\n");
         properties.setProperty("format", "json");
         executionOptions = OptionUtils.buildExecutionOptional(properties);
-        byte[] expectBuffer = "[{\"id\": 1},{\"id\": 
2}]".getBytes(StandardCharsets.UTF_8);
+        byte[] expectBuffer = "{\"id\": 1}\n{\"id\": 
2}".getBytes(StandardCharsets.UTF_8);
         CloseableHttpClient httpClient = mock(CloseableHttpClient.class);
         CloseableHttpResponse preCommitResponse = 
HttpTestUtil.getResponse(HttpTestUtil.PRE_COMMIT_RESPONSE, true);
         when(httpClient.execute(any())).thenReturn(preCommitResponse);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to