ruanwenjun commented on a change in pull request #1452:
URL:
https://github.com/apache/incubator-seatunnel/pull/1452#discussion_r829151930
##########
File path:
seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/sink/DruidOutputFormat.java
##########
@@ -64,25 +65,29 @@
private static final String DEFAULT_TIMESTAMP_COLUMN = "timestamp";
private static final String DEFAULT_TIMESTAMP_FORMAT = "auto";
private static final DateTime DEFAULT_TIMESTAMP_MISSING_VALUE = null;
+ private static final long DEFAULT_BATCH_SIZE = 1024;
- private final transient StringBuffer data;
+ private final StringBuffer data = new StringBuffer();
Review comment:
@asdf2014 When we use `DataStream#writeUsingOutputFormat(OutputFormat<T>
format)`, we can see the output function is wrapper into a `SinkFunction`.
```java
@Deprecated
@PublicEvolving
public DataStreamSink<T> writeUsingOutputFormat(OutputFormat<T> format) {
return addSink(new OutputFormatSinkFunction<>(format));
}
```
The `SinkFunction#invoke(IN record)` will be triggered when receive record.
It will use the format function to write the record.
```java
@Override
public void invoke(IN record) throws Exception {
try {
format.writeRecord(record);
} catch (Exception ex) {
cleanup();
throw ex;
}
}
```
Unfortunately, I didn't find the doc about the `SinkFunction` from Flink
website, I just remember that.
We can get some information from the [email
thread](https://lists.apache.org/thread/hnqzz02jrgpkx3y0lrcrtf8r5o724tl9) and
[stack
overflow](https://stackoverflow.com/questions/70075452/apache-flink-sinkfunction-requirement#:~:text=All%20user%2Ddefined%20functions%20in,your%20Sink%20function%20is%20safe.)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]