This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new ebb4c28 [Flink]Simplify the use of flink connector (#6892)
ebb4c28 is described below
commit ebb4c282b1bd65f032c52526c6fad48793e3d016
Author: xiaokangguo <[email protected]>
AuthorDate: Sat Oct 23 18:10:47 2021 +0800
[Flink]Simplify the use of flink connector (#6892)
1. Simplify the use of flink connector like other stream sink by
GenericDorisSinkFunction.
2. Add the use cases of flink connector.
## Use case
```
env.fromElements("{\"longitude\": \"116.405419\", \"city\": \"北京\",
\"latitude\": \"39.916927\"}")
.addSink(
DorisSink.sink(
DorisOptions.builder()
.setFenodes("FE_IP:8030")
.setTableIdentifier("db.table")
.setUsername("root")
.setPassword("").build()
));
```
---
docs/en/extending-doris/flink-doris-connector.md | 107 +++++++++-
.../zh-CN/extending-doris/flink-doris-connector.md | 106 +++++++++-
.../doris/flink/cfg/DorisExecutionOptions.java | 20 +-
.../apache/doris/flink/cfg/DorisReadOptions.java | 5 +
.../java/org/apache/doris/flink/cfg/DorisSink.java | 95 +++++++++
.../doris/flink/cfg/GenericDorisSinkFunction.java | 53 +++++
.../flink/table/DorisDynamicOutputFormat.java | 77 +++++---
.../doris/flink/DorisOutPutFormatExample.java | 84 ++++++++
.../apache/doris/flink/DorisStreamSinkExample.java | 219 +++++++++++++++++++++
9 files changed, 728 insertions(+), 38 deletions(-)
diff --git a/docs/en/extending-doris/flink-doris-connector.md
b/docs/en/extending-doris/flink-doris-connector.md
index 3046169..c42d237 100644
--- a/docs/en/extending-doris/flink-doris-connector.md
+++ b/docs/en/extending-doris/flink-doris-connector.md
@@ -119,7 +119,7 @@ CREATE TABLE flink_doris_sink (
INSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_source
```
-### DataStream
+### DataStreamSource
```scala
Properties properties = new Properties();
@@ -130,6 +130,111 @@ INSERT INTO flink_doris_sink select name,age,price,sale
from flink_doris_source
env.addSource(new DorisSourceFunction(new DorisStreamOptions(properties),new
SimpleListDeserializationSchema())).print();
```
+### DataStreamSink
+
+```java
+// -------- sink with raw json string stream --------
+Properties pro = new Properties();
+pro.setProperty("format", "json");
+pro.setProperty("strip_outer_array", "true");
+env.fromElements( "{\"longitude\": \"116.405419\", \"city\": \"北京\",
\"latitude\": \"39.916927\"}")
+ .addSink(
+ DorisSink.sink(
+ DorisReadOptions.builder().build(),
+ DorisExecutionOptions.builder()
+ .setBatchSize(3)
+ .setBatchIntervalMs(0l)
+ .setMaxRetries(3)
+ .setStreamLoadProp(pro).build(),
+ DorisOptions.builder()
+ .setFenodes("FE_IP:8030")
+ .setTableIdentifier("db.table")
+ .setUsername("root")
+ .setPassword("").build()
+ ));
+
+OR
+env.fromElements("{\"longitude\": \"116.405419\", \"city\": \"北京\",
\"latitude\": \"39.916927\"}")
+ .addSink(
+ DorisSink.sink(
+ DorisOptions.builder()
+ .setFenodes("FE_IP:8030")
+ .setTableIdentifier("db.table")
+ .setUsername("root")
+ .setPassword("").build()
+ ));
+
+
+// -------- sink with RowData stream --------
+DataStream<RowData> source = env.fromElements("")
+ .map(new MapFunction<String, RowData>() {
+ @Override
+ public RowData map(String value) throws Exception {
+ GenericRowData genericRowData = new GenericRowData(3);
+ genericRowData.setField(0, StringData.fromString("北京"));
+ genericRowData.setField(1, 116.405419);
+ genericRowData.setField(2, 39.916927);
+ return genericRowData;
+ }
+ });
+
+String[] fields = {"city", "longitude", "latitude"};
+LogicalType[] types = {new VarCharType(), new DoubleType(), new DoubleType()};
+
+source.addSink(
+ DorisSink.sink(
+ fields,
+ types,
+ DorisReadOptions.builder().build(),
+ DorisExecutionOptions.builder()
+ .setBatchSize(3)
+ .setBatchIntervalMs(0L)
+ .setMaxRetries(3)
+ .build(),
+ DorisOptions.builder()
+ .setFenodes("FE_IP:8030")
+ .setTableIdentifier("db.table")
+ .setUsername("root")
+ .setPassword("").build()
+ ));
+```
+
+### DataSetSink
+
+```java
+MapOperator<String, RowData> data = env.fromElements("")
+ .map(new MapFunction<String, RowData>() {
+ @Override
+ public RowData map(String value) throws Exception {
+ GenericRowData genericRowData = new GenericRowData(3);
+ genericRowData.setField(0, StringData.fromString("北京"));
+ genericRowData.setField(1, 116.405419);
+ genericRowData.setField(2, 39.916927);
+ return genericRowData;
+ }
+ });
+
+DorisOptions dorisOptions = DorisOptions.builder()
+ .setFenodes("FE_IP:8030")
+ .setTableIdentifier("db.table")
+ .setUsername("root")
+ .setPassword("").build();
+DorisReadOptions readOptions = DorisReadOptions.defaults();
+DorisExecutionOptions executionOptions = DorisExecutionOptions.defaults();
+
+LogicalType[] types = {new VarCharType(), new DoubleType(), new DoubleType()};
+String[] fiels = {"city", "longitude", "latitude"};
+
+DorisDynamicOutputFormat outputFormat =
+ new DorisDynamicOutputFormat(dorisOptions, readOptions, executionOptions,
types, fiels);
+
+outputFormat.open(0, 1);
+data.output(outputFormat);
+outputFormat.close();
+```
+
+
+
### General
| Key | Default Value | Comment
|
diff --git a/docs/zh-CN/extending-doris/flink-doris-connector.md
b/docs/zh-CN/extending-doris/flink-doris-connector.md
index f9dca64..9ea1eba 100644
--- a/docs/zh-CN/extending-doris/flink-doris-connector.md
+++ b/docs/zh-CN/extending-doris/flink-doris-connector.md
@@ -120,7 +120,7 @@ CREATE TABLE flink_doris_sink (
INSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_source
```
-### DataStream
+### DataStreamSource
```java
Properties properties = new Properties();
@@ -131,6 +131,110 @@ INSERT INTO flink_doris_sink select name,age,price,sale
from flink_doris_source
env.addSource(new DorisSourceFunction(new DorisStreamOptions(properties),new
SimpleListDeserializationSchema())).print();
```
+### DataStreamSink
+
+```java
+// -------- sink with raw json string stream --------
+Properties pro = new Properties();
+pro.setProperty("format", "json");
+pro.setProperty("strip_outer_array", "true");
+env.fromElements( "{\"longitude\": \"116.405419\", \"city\": \"北京\",
\"latitude\": \"39.916927\"}")
+ .addSink(
+ DorisSink.sink(
+ DorisReadOptions.builder().build(),
+ DorisExecutionOptions.builder()
+ .setBatchSize(3)
+ .setBatchIntervalMs(0l)
+ .setMaxRetries(3)
+ .setStreamLoadProp(pro).build(),
+ DorisOptions.builder()
+ .setFenodes("FE_IP:8030")
+ .setTableIdentifier("db.table")
+ .setUsername("root")
+ .setPassword("").build()
+ ));
+
+OR
+env.fromElements("{\"longitude\": \"116.405419\", \"city\": \"北京\",
\"latitude\": \"39.916927\"}")
+ .addSink(
+ DorisSink.sink(
+ DorisOptions.builder()
+ .setFenodes("FE_IP:8030")
+ .setTableIdentifier("db.table")
+ .setUsername("root")
+ .setPassword("").build()
+ ));
+
+
+// -------- sink with RowData stream --------
+DataStream<RowData> source = env.fromElements("")
+ .map(new MapFunction<String, RowData>() {
+ @Override
+ public RowData map(String value) throws Exception {
+ GenericRowData genericRowData = new GenericRowData(3);
+ genericRowData.setField(0, StringData.fromString("北京"));
+ genericRowData.setField(1, 116.405419);
+ genericRowData.setField(2, 39.916927);
+ return genericRowData;
+ }
+ });
+
+String[] fields = {"city", "longitude", "latitude"};
+LogicalType[] types = {new VarCharType(), new DoubleType(), new DoubleType()};
+
+source.addSink(
+ DorisSink.sink(
+ fields,
+ types,
+ DorisReadOptions.builder().build(),
+ DorisExecutionOptions.builder()
+ .setBatchSize(3)
+ .setBatchIntervalMs(0L)
+ .setMaxRetries(3)
+ .build(),
+ DorisOptions.builder()
+ .setFenodes("FE_IP:8030")
+ .setTableIdentifier("db.table")
+ .setUsername("root")
+ .setPassword("").build()
+ ));
+```
+
+### DataSetSink
+
+```java
+MapOperator<String, RowData> data = env.fromElements("")
+ .map(new MapFunction<String, RowData>() {
+ @Override
+ public RowData map(String value) throws Exception {
+ GenericRowData genericRowData = new GenericRowData(3);
+ genericRowData.setField(0, StringData.fromString("北京"));
+ genericRowData.setField(1, 116.405419);
+ genericRowData.setField(2, 39.916927);
+ return genericRowData;
+ }
+ });
+
+DorisOptions dorisOptions = DorisOptions.builder()
+ .setFenodes("FE_IP:8030")
+ .setTableIdentifier("db.table")
+ .setUsername("root")
+ .setPassword("").build();
+DorisReadOptions readOptions = DorisReadOptions.defaults();
+DorisExecutionOptions executionOptions = DorisExecutionOptions.defaults();
+
+LogicalType[] types = {new VarCharType(), new DoubleType(), new DoubleType()};
+String[] fiels = {"city", "longitude", "latitude"};
+
+DorisDynamicOutputFormat outputFormat =
+ new DorisDynamicOutputFormat(dorisOptions, readOptions, executionOptions,
types, fiels);
+
+outputFormat.open(0, 1);
+data.output(outputFormat);
+outputFormat.close();
+```
+
+
## 配置
diff --git
a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
index 3d035ab..587ab07 100644
---
a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
+++
b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
@@ -20,7 +20,6 @@ package org.apache.doris.flink.cfg;
import org.apache.flink.util.Preconditions;
import java.io.Serializable;
-import java.time.Duration;
import java.util.Properties;
/**
@@ -29,6 +28,10 @@ import java.util.Properties;
public class DorisExecutionOptions implements Serializable {
private static final long serialVersionUID = 1L;
+ public static final Integer DEFAULT_BATCH_SIZE = 1000;
+ public static final Integer DEFAULT_MAX_RETRY_TIMES = 3;
+ private static final Long DEFAULT_INTERVAL_MILLIS = 10000L;
+
private final Integer batchSize;
private final Integer maxRetries;
private final Long batchIntervalMs;
@@ -66,14 +69,21 @@ public class DorisExecutionOptions implements Serializable {
return new Builder();
}
+ public static DorisExecutionOptions defaults() {
+ Properties pro = new Properties();
+ pro.setProperty("format", "json");
+ pro.setProperty("strip_outer_array", "true");
+ return new Builder().setStreamLoadProp(pro).build();
+ }
+
/**
* Builder of {@link DorisExecutionOptions}.
*/
public static class Builder {
- private Integer batchSize;
- private Integer maxRetries;
- private Long batchIntervalMs;
- private Properties streamLoadProp;
+ private Integer batchSize = DEFAULT_BATCH_SIZE;
+ private Integer maxRetries = DEFAULT_MAX_RETRY_TIMES;
+ private Long batchIntervalMs = DEFAULT_INTERVAL_MILLIS;
+ private Properties streamLoadProp = new Properties();
public Builder setBatchSize(Integer batchSize) {
this.batchSize = batchSize;
diff --git
a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
index 53cefaa..0beb18c 100644
---
a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
+++
b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
@@ -103,6 +103,10 @@ public class DorisReadOptions implements Serializable {
return new Builder();
}
+ public static DorisReadOptions defaults(){
+ return DorisReadOptions.builder().build();
+ }
+
/**
* Builder of {@link DorisReadOptions}.
*/
@@ -179,6 +183,7 @@ public class DorisReadOptions implements Serializable {
public DorisReadOptions build() {
return new DorisReadOptions(readFields, filterQuery,
requestTabletSize, requestConnectTimeoutMs, requestReadTimeoutMs,
requestQueryTimeoutS, requestRetries, requestBatchSize, execMemLimit,
deserializeQueueSize, deserializeArrowAsync);
}
+
}
diff --git
a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisSink.java
b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisSink.java
new file mode 100644
index 0000000..f11c587
--- /dev/null
+++
b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisSink.java
@@ -0,0 +1,95 @@
+package org.apache.doris.flink.cfg;
+
+import org.apache.doris.flink.table.DorisDynamicOutputFormat;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.table.types.logical.LogicalType;
+
+/** Facade to create Doris {@link SinkFunction sinks}. */
+public class DorisSink {
+
+
+ private DorisSink() {
+ }
+
+
+ /**
+ * Create a Doris DataStream sink with the default {@link DorisReadOptions}
+ * stream elements could only be JsonString.
+ *
+ * @see #sink(String[], LogicalType[], DorisReadOptions,
DorisExecutionOptions, DorisOptions)
+ */
+ public static <T> SinkFunction<T> sink(DorisExecutionOptions
executionOptions, DorisOptions dorisOptions) {
+
+ return sink(new String[]{}, new LogicalType[]{},
DorisReadOptions.defaults(), executionOptions, dorisOptions);
+ }
+
+ /**
+ * Create a Doris DataStream sink with the default {@link DorisReadOptions}
+ * stream elements could only be RowData.
+ *
+ * @see #sink(String[], LogicalType[], DorisReadOptions,
DorisExecutionOptions, DorisOptions)
+ */
+ public static <T> SinkFunction<T> sink(String[] fiels, LogicalType[] types,
+ DorisExecutionOptions
executionOptions, DorisOptions dorisOptions) {
+
+ return sink(fiels, types, DorisReadOptions.defaults(),
executionOptions, dorisOptions);
+ }
+
+ /**
+ * Create a Doris DataStream sink with the default {@link
DorisExecutionOptions}
+ * stream elements could only be JsonString.
+ *
+ * @see #sink(String[], LogicalType[], DorisReadOptions,
DorisExecutionOptions, DorisOptions)
+ */
+ public static <T> SinkFunction<T> sink(DorisOptions dorisOptions) {
+
+ return sink(new String[]{}, new LogicalType[]{},
DorisReadOptions.defaults(),
+ DorisExecutionOptions.defaults(), dorisOptions);
+ }
+
+ /**
+ * Create a Doris DataStream sink with the default {@link
DorisExecutionOptions}
+ * stream elements could only be RowData.
+ *
+ * @see #sink(String[], LogicalType[], DorisReadOptions,
DorisExecutionOptions, DorisOptions)
+ */
+ public static <T> SinkFunction<T> sink(String[] fiels, LogicalType[]
types, DorisOptions dorisOptions) {
+ return sink(fiels, types, DorisReadOptions.defaults(),
DorisExecutionOptions.defaults(), dorisOptions);
+ }
+
+
+ /**
+ * Create a Doris DataStream sink, stream elements could only be
JsonString.
+ *
+ * @see #sink(String[], LogicalType[], DorisReadOptions,
DorisExecutionOptions, DorisOptions)
+ */
+ public static <T> SinkFunction<T> sink(DorisReadOptions readOptions,
+ DorisExecutionOptions
executionOptions, DorisOptions dorisOptions) {
+
+ return sink(new String[]{}, new LogicalType[]{}, readOptions,
executionOptions, dorisOptions);
+ }
+
+
+ /**
+ * Create a Doris DataStream sink, stream elements could only be RowData.
+ *
+ * <p>Note: the objects passed to the return sink can be processed in
batch and retried.
+ * Therefore, objects can not be {@link
org.apache.flink.api.common.ExecutionConfig#enableObjectReuse() reused}.
+ * </p>
+ *
+ * @param field array of field
+ * @param types types of field
+ * @param readOptions parameters of read, such as readFields,
filterQuery
+ * @param executionOptions parameters of execution, such as batch size and
maximum retries
+ * @param dorisOptions parameters of options, such as fenodes,
username, password, tableIdentifier
+ * @param <T> type of data in {@link
org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+ * StreamRecord}.
+ */
+ public static <T> SinkFunction<T> sink(String[] field, LogicalType[]
types, DorisReadOptions readOptions,
+ DorisExecutionOptions
executionOptions, DorisOptions dorisOptions) {
+
+ return new GenericDorisSinkFunction(new DorisDynamicOutputFormat(
+ dorisOptions, readOptions, executionOptions, types, field));
+ }
+
+}
diff --git
a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/GenericDorisSinkFunction.java
b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/GenericDorisSinkFunction.java
new file mode 100644
index 0000000..92dd300
--- /dev/null
+++
b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/GenericDorisSinkFunction.java
@@ -0,0 +1,53 @@
+package org.apache.doris.flink.cfg;
+
+import org.apache.doris.flink.table.DorisDynamicOutputFormat;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+public class GenericDorisSinkFunction<T> extends RichSinkFunction<T>
+ implements CheckpointedFunction {
+
+ private final DorisDynamicOutputFormat outputFormat;
+
+ public GenericDorisSinkFunction(@Nonnull DorisDynamicOutputFormat
outputFormat) {
+ this.outputFormat = Preconditions.checkNotNull(outputFormat);
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ RuntimeContext ctx = getRuntimeContext();
+ outputFormat.setRuntimeContext(ctx);
+ outputFormat.open(ctx.getIndexOfThisSubtask(),
ctx.getNumberOfParallelSubtasks());
+ }
+
+
+ @Override
+ public void invoke(T value, Context context) throws Exception {
+ outputFormat.writeRecord(value);
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext context) throws
Exception {
+
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext context) throws
Exception {
+
+ }
+
+ @Override
+ public void close() throws Exception {
+ outputFormat.close();
+ super.close();
+ }
+
+}
\ No newline at end of file
diff --git
a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
index 73c68b6..0fd154a 100644
---
a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
+++
b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
@@ -34,10 +34,10 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Arrays;
import java.util.StringJoiner;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -50,7 +50,7 @@ import static
org.apache.flink.table.data.RowData.createFieldGetter;
/**
* DorisDynamicOutputFormat
**/
-public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> {
+public class DorisDynamicOutputFormat<T> extends RichOutputFormat<T> {
private static final Logger LOG =
LoggerFactory.getLogger(DorisDynamicOutputFormat.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@@ -88,8 +88,10 @@ public class DorisDynamicOutputFormat extends
RichOutputFormat<RowData> {
this.options = option;
this.readOptions = readOptions;
this.executionOptions = executionOptions;
- this.fieldDelimiter =
executionOptions.getStreamLoadProp().getProperty(FIELD_DELIMITER_KEY,
FIELD_DELIMITER_DEFAULT);
- this.lineDelimiter =
executionOptions.getStreamLoadProp().getProperty(LINE_DELIMITER_KEY,
LINE_DELIMITER_DEFAULT);
+ this.fieldDelimiter =
executionOptions.getStreamLoadProp().getProperty(FIELD_DELIMITER_KEY,
+ FIELD_DELIMITER_DEFAULT);
+ this.lineDelimiter =
executionOptions.getStreamLoadProp().getProperty(LINE_DELIMITER_KEY,
+ LINE_DELIMITER_DEFAULT);
this.fieldNames = fieldNames;
this.jsonFormat =
FORMAT_JSON_VALUE.equals(executionOptions.getStreamLoadProp().getProperty(FORMAT_KEY));
this.fieldGetters = new RowData.FieldGetter[logicalTypes.length];
@@ -98,6 +100,7 @@ public class DorisDynamicOutputFormat extends
RichOutputFormat<RowData> {
}
}
+
@Override
public void configure(Configuration configuration) {
}
@@ -105,16 +108,17 @@ public class DorisDynamicOutputFormat extends
RichOutputFormat<RowData> {
@Override
public void open(int taskNumber, int numTasks) throws IOException {
dorisStreamLoad = new DorisStreamLoad(
- getBackend(),
- options.getTableIdentifier().split("\\.")[0],
- options.getTableIdentifier().split("\\.")[1],
- options.getUsername(),
- options.getPassword(),
- executionOptions.getStreamLoadProp());
+ getBackend(),
+ options.getTableIdentifier().split("\\.")[0],
+ options.getTableIdentifier().split("\\.")[1],
+ options.getUsername(),
+ options.getPassword(),
+ executionOptions.getStreamLoadProp());
LOG.info("Streamload BE:{}", dorisStreamLoad.getLoadUrlStr());
if (executionOptions.getBatchIntervalMs() != 0 &&
executionOptions.getBatchSize() != 1) {
- this.scheduler = Executors.newScheduledThreadPool(1, new
ExecutorThreadFactory("doris-streamload-output-format"));
+ this.scheduler = Executors.newScheduledThreadPool(1, new
ExecutorThreadFactory("doris-streamload-output" +
+ "-format"));
this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() ->
{
synchronized (DorisDynamicOutputFormat.this) {
if (!closed) {
@@ -136,30 +140,37 @@ public class DorisDynamicOutputFormat extends
RichOutputFormat<RowData> {
}
@Override
- public synchronized void writeRecord(RowData row) throws IOException {
+ public synchronized void writeRecord(T row) throws IOException {
checkFlushException();
-
addBatch(row);
if (executionOptions.getBatchSize() > 0 && batch.size() >=
executionOptions.getBatchSize()) {
flush();
}
}
- private void addBatch(RowData row) {
- Map<String, String> valueMap = new HashMap<>();
- StringJoiner value = new StringJoiner(this.fieldDelimiter);
- for (int i = 0; i < row.getArity() && i < fieldGetters.length; ++i) {
- Object field = fieldGetters[i].getFieldOrNull(row);
- if (jsonFormat) {
- String data = field != null ? field.toString() : null;
- valueMap.put(this.fieldNames[i], data);
- } else {
- String data = field != null ? field.toString() : NULL_VALUE;
- value.add(data);
+ private void addBatch(T row) {
+ if (row instanceof RowData) {
+ RowData rowData = (RowData) row;
+ Map<String, String> valueMap = new HashMap<>();
+ StringJoiner value = new StringJoiner(this.fieldDelimiter);
+ for (int i = 0; i < rowData.getArity() && i < fieldGetters.length;
++i) {
+ Object field = fieldGetters[i].getFieldOrNull(rowData);
+ if (jsonFormat) {
+ String data = field != null ? field.toString() : null;
+ valueMap.put(this.fieldNames[i], data);
+ } else {
+ String data = field != null ? field.toString() :
NULL_VALUE;
+ value.add(data);
+ }
}
+ Object data = jsonFormat ? valueMap : value.toString();
+ batch.add(data);
+
+ } else if (row instanceof String) {
+ batch.add(row);
+ } else {
+ throw new RuntimeException("The type of element should be
'RowData' or 'String' only.");
}
- Object data = jsonFormat ? valueMap : value.toString();
- batch.add(data);
}
@Override
@@ -189,7 +200,11 @@ public class DorisDynamicOutputFormat extends
RichOutputFormat<RowData> {
}
String result;
if (jsonFormat) {
- result = OBJECT_MAPPER.writeValueAsString(batch);
+ if (batch.get(0) instanceof String) {
+ result = batch.toString();
+ } else {
+ result = OBJECT_MAPPER.writeValueAsString(batch);
+ }
} else {
result = String.join(this.lineDelimiter, batch);
}
@@ -292,11 +307,11 @@ public class DorisDynamicOutputFormat extends
RichOutputFormat<RowData> {
public DorisDynamicOutputFormat build() {
final LogicalType[] logicalTypes =
- Arrays.stream(fieldDataTypes)
- .map(DataType::getLogicalType)
- .toArray(LogicalType[]::new);
+ Arrays.stream(fieldDataTypes)
+ .map(DataType::getLogicalType)
+ .toArray(LogicalType[]::new);
return new DorisDynamicOutputFormat(
- optionsBuilder.build(), readOptions, executionOptions,
logicalTypes, fieldNames
+ optionsBuilder.build(), readOptions, executionOptions,
logicalTypes, fieldNames
);
}
}
diff --git
a/extension/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisOutPutFormatExample.java
b/extension/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisOutPutFormatExample.java
new file mode 100644
index 0000000..a64e3d9
--- /dev/null
+++
b/extension/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisOutPutFormatExample.java
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink;
+
+import org.apache.doris.flink.cfg.DorisExecutionOptions;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.table.DorisDynamicOutputFormat;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.MapOperator;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import java.io.IOException;
+
+/**
+ * example using {@link DorisDynamicOutputFormat} for batching.
+ */
+public class DorisOutPutFormatExample {
+
+ public static void main(String[] args) throws Exception {
+
+ ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+
+ MapOperator<String, RowData> data = env.fromElements("")
+ .map(new MapFunction<String, RowData>() {
+ @Override
+ public RowData map(String value) throws Exception {
+ GenericRowData genericRowData = new GenericRowData(3);
+ genericRowData.setField(0,
StringData.fromString("北京"));
+ genericRowData.setField(1, 116.405419);
+ genericRowData.setField(2, 39.916927);
+ return genericRowData;
+ }
+ });
+
+ DorisOptions dorisOptions = DorisOptions.builder()
+ .setFenodes("FE_IP:8030")
+ .setTableIdentifier("db.table")
+ .setUsername("root")
+ .setPassword("").build();
+ DorisReadOptions readOptions = DorisReadOptions.defaults();
+ DorisExecutionOptions executionOptions =
DorisExecutionOptions.defaults();
+
+ LogicalType[] types = {new VarCharType(), new DoubleType(), new
DoubleType()};
+ String[] fiels = {"city", "longitude", "latitude"};
+
+ DorisDynamicOutputFormat outputFormat =
+ new DorisDynamicOutputFormat(dorisOptions, readOptions,
executionOptions, types, fiels);
+
+ try {
+ outputFormat.open(0, 1);
+ data.output(outputFormat);
+ outputFormat.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ env.execute("doris batch sink example");
+
+
+ }
+
+}
diff --git
a/extension/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisStreamSinkExample.java
b/extension/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisStreamSinkExample.java
new file mode 100644
index 0000000..d37fd0d
--- /dev/null
+++
b/extension/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisStreamSinkExample.java
@@ -0,0 +1,219 @@
+package org.apache.doris.flink;
+
+import org.apache.doris.flink.cfg.DorisExecutionOptions;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.cfg.DorisSink;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import java.util.Properties;
+
+/**
+ * example using {@link DorisSink} for streaming.
+ */
+public class DorisStreamSinkExample {
+
+
+ public void testJsonString() throws Exception {
+ /*
+ * Example for JsonString element
+ */
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+
+ Properties pro = new Properties();
+ pro.setProperty("format", "json");
+ pro.setProperty("strip_outer_array", "true");
+ env.fromElements("{\"longitude\": \"116.405419\", \"city\": \"北京\",
\"latitude\": \"39.916927\"}")
+ .addSink(
+ DorisSink.sink(
+ DorisReadOptions.builder().build(),
+ DorisExecutionOptions.builder()
+ .setBatchSize(3)
+ .setBatchIntervalMs(0l)
+ .setMaxRetries(3)
+ .setStreamLoadProp(pro).build(),
+ DorisOptions.builder()
+ .setFenodes("FE_IP:8030")
+ .setTableIdentifier("db.table")
+ .setUsername("root")
+ .setPassword("").build()
+ ));
+ env.execute("doris stream sink example");
+ }
+
+
+ public void testJsonStringWithDefaultReadOptions() throws Exception {
+ /*
+ * Example for JsonString element with default ReadOptions
+ */
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+
+ Properties pro = new Properties();
+ pro.setProperty("format", "json");
+ pro.setProperty("strip_outer_array", "true");
+ env.fromElements("{\"longitude\": \"116.405419\", \"city\": \"北京\",
\"latitude\": \"39.916927\"}")
+ .addSink(
+ DorisSink.sink(
+ DorisExecutionOptions.builder()
+ .setBatchSize(3)
+ .setBatchIntervalMs(0l)
+ .setMaxRetries(3)
+ .setStreamLoadProp(pro).build(),
+ DorisOptions.builder()
+ .setFenodes("FE_IP:8030")
+ .setTableIdentifier("db.table")
+ .setUsername("root")
+ .setPassword("").build()
+ ));
+ env.execute("doris stream sink example");
+ }
+
+
+ public void testJsonStringWithDefaultReadOptionsAndExecutionOptions()
throws Exception {
+ /*
+ * Example for JsonString element with default ReadOptions and
ExecutionOptions
+ */
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+
+ env.fromElements("{\"longitude\": \"116.405419\", \"city\": \"北京\",
\"latitude\": \"39.916927\"}")
+ .addSink(
+ DorisSink.sink(
+ DorisOptions.builder()
+ .setFenodes("192.168.52.101:8030")
+
.setTableIdentifier("smarttrip_db.doris_output_format")
+ .setUsername("root")
+ .setPassword("").build()
+ ));
+ env.execute("doris stream sink example");
+ }
+
+
+ public void testRowData() throws Exception {
+ /*
+ * Example for RowData element
+ */
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+
+ DataStream<RowData> source = env.fromElements("")
+ .map(new MapFunction<String, RowData>() {
+ @Override
+ public RowData map(String value) throws Exception {
+ GenericRowData genericRowData = new GenericRowData(3);
+ genericRowData.setField(0,
StringData.fromString("北京"));
+ genericRowData.setField(1, 116.405419);
+ genericRowData.setField(2, 39.916927);
+ return genericRowData;
+ }
+ });
+
+ String[] fields = {"city", "longitude", "latitude"};
+ LogicalType[] types = {new VarCharType(), new DoubleType(), new
DoubleType()};
+
+ source.addSink(
+ DorisSink.sink(
+ fields,
+ types,
+ DorisReadOptions.builder().build(),
+ DorisExecutionOptions.builder()
+ .setBatchSize(3)
+ .setBatchIntervalMs(0L)
+ .setMaxRetries(3)
+ .build(),
+ DorisOptions.builder()
+ .setFenodes("FE_IP:8030")
+ .setTableIdentifier("db.table")
+ .setUsername("root")
+ .setPassword("").build()
+ ));
+ env.execute("doris stream sink example");
+ }
+
+
+ public void testRowDataWithDefaultReadOptions() throws Exception {
+ /*
+ * Example for RowData element with default ReadOptions
+ */
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+
+ DataStream<RowData> source = env.fromElements("")
+ .map(new MapFunction<String, RowData>() {
+ @Override
+ public RowData map(String value) throws Exception {
+ GenericRowData genericRowData = new GenericRowData(3);
+ genericRowData.setField(0,
StringData.fromString("北京"));
+ genericRowData.setField(1, 116.405419);
+ genericRowData.setField(2, 39.916927);
+ return genericRowData;
+ }
+ });
+
+ String[] fields = {"city", "longitude", "latitude"};
+ LogicalType[] types = {new VarCharType(), new DoubleType(), new
DoubleType()};
+
+ source.addSink(
+ DorisSink.sink(
+ fields,
+ types,
+ DorisExecutionOptions.builder()
+ .setBatchSize(3)
+ .setBatchIntervalMs(0L)
+ .setMaxRetries(3)
+ .build(),
+ DorisOptions.builder()
+ .setFenodes("FE_IP:8030")
+ .setTableIdentifier("db.table")
+ .setUsername("root")
+ .setPassword("").build()
+ ));
+ env.execute("doris stream sink example");
+ }
+
+
+ public void testRowDataWithDefaultReadOptionsAndExecutionOptions() throws
Exception {
+ /*
+ * Example for RowData element with default ReadOptions and
ExecutionOptions
+ */
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+
+ DataStream<RowData> source = env.fromElements("")
+ .map(new MapFunction<String, RowData>() {
+ @Override
+ public RowData map(String value) throws Exception {
+ GenericRowData genericRowData = new GenericRowData(3);
+ genericRowData.setField(0,
StringData.fromString("北京"));
+ genericRowData.setField(1, 116.405419);
+ genericRowData.setField(2, 39.916927);
+ return genericRowData;
+ }
+ });
+
+ String[] fields = {"city", "longitude", "latitude"};
+ LogicalType[] types = {new VarCharType(), new DoubleType(), new
DoubleType()};
+
+ source.addSink(
+ DorisSink.sink(
+ fields,
+ types,
+ DorisOptions.builder()
+ .setFenodes("FE_IP:8030")
+ .setTableIdentifier("db.table")
+ .setUsername("root")
+ .setPassword("").build()
+ ));
+ env.execute("doris stream sink example");
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]