pengxiangyu commented on a change in pull request #6243:
URL: https://github.com/apache/incubator-doris/pull/6243#discussion_r672106795
##########
File path:
extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
##########
@@ -234,27 +239,39 @@ private DorisReadOptions
getDorisReadOptions(ReadableConfig readableConfig) {
return builder.build();
}
- private DorisExecutionOptions getDorisExecutionOptions(ReadableConfig
readableConfig) {
+ private DorisExecutionOptions getDorisExecutionOptions(ReadableConfig
readableConfig,Properties streamLoadProp) {
Review comment:
need a space before Properties
##########
File path:
extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java
##########
@@ -54,15 +57,17 @@
private String db;
private String tbl;
private String authEncoding;
+ private Properties streamLoadProp;
- public DorisStreamLoad(String hostPort, String db, String tbl, String
user, String passwd) {
+ public DorisStreamLoad(String hostPort, String db, String tbl, String
user, String passwd,Properties streamLoadProp) {
Review comment:
space
##########
File path:
extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
##########
@@ -234,27 +239,39 @@ private DorisReadOptions
getDorisReadOptions(ReadableConfig readableConfig) {
return builder.build();
}
- private DorisExecutionOptions getDorisExecutionOptions(ReadableConfig
readableConfig) {
+ private DorisExecutionOptions getDorisExecutionOptions(ReadableConfig
readableConfig,Properties streamLoadProp) {
final DorisExecutionOptions.Builder builder =
DorisExecutionOptions.builder();
builder.setBatchSize(readableConfig.get(SINK_BUFFER_FLUSH_MAX_ROWS));
builder.setMaxRetries(readableConfig.get(SINK_MAX_RETRIES));
builder.setBatchIntervalMs(readableConfig.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis());
+ builder.setStreamLoadProp(streamLoadProp);
return builder.build();
}
+ private Properties getStreamLoadProp(Map<String, String> tableOptions){
+ final Properties streamLoadProp = new Properties();
+
+ for(Map.Entry<String,String> entry : tableOptions.entrySet()){
+ if(entry.getKey().startsWith(STREAM_LOAD_PROP_PREFIX)){
+ String subKey =
entry.getKey().substring((STREAM_LOAD_PROP_PREFIX).length());
Review comment:
STREAM_LOAD_PROP_PREFIX need not ( )
##########
File path:
extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
##########
@@ -234,27 +239,39 @@ private DorisReadOptions
getDorisReadOptions(ReadableConfig readableConfig) {
return builder.build();
}
- private DorisExecutionOptions getDorisExecutionOptions(ReadableConfig
readableConfig) {
+ private DorisExecutionOptions getDorisExecutionOptions(ReadableConfig
readableConfig,Properties streamLoadProp) {
final DorisExecutionOptions.Builder builder =
DorisExecutionOptions.builder();
builder.setBatchSize(readableConfig.get(SINK_BUFFER_FLUSH_MAX_ROWS));
builder.setMaxRetries(readableConfig.get(SINK_MAX_RETRIES));
builder.setBatchIntervalMs(readableConfig.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis());
+ builder.setStreamLoadProp(streamLoadProp);
return builder.build();
}
+ private Properties getStreamLoadProp(Map<String, String> tableOptions){
+ final Properties streamLoadProp = new Properties();
+
+ for(Map.Entry<String,String> entry : tableOptions.entrySet()){
+ if(entry.getKey().startsWith(STREAM_LOAD_PROP_PREFIX)){
+ String subKey =
entry.getKey().substring((STREAM_LOAD_PROP_PREFIX).length());
+ streamLoadProp.put(subKey, entry.getValue());
+ }
+ }
+ return streamLoadProp;
+ }
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
- // either implement your custom validation logic here ...
- // or use the provided helper utility
final FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
// validate all options
- helper.validate();
+ helper.validateExcept(STREAM_LOAD_PROP_PREFIX);
+
+ Properties streamLoadProp =
getStreamLoadProp(context.getCatalogTable().getOptions());
// create and return dynamic table source
return new DorisDynamicTableSink(
getDorisOptions(helper.getOptions()),
getDorisReadOptions(helper.getOptions()),
- getDorisExecutionOptions(helper.getOptions())
+
getDorisExecutionOptions(helper.getOptions(),streamLoadProp)
Review comment:
need a space before streamLoadProp
##########
File path:
extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
##########
@@ -66,21 +74,23 @@ public DorisDynamicOutputFormat(DorisOptions
option,DorisReadOptions readOptions
this.options = option;
this.readOptions = readOptions;
this.executionOptions = executionOptions;
+ this.fieldDelimiter =
executionOptions.getStreamLoadProp().getProperty(FIELD_DELIMITER_KEY,FIELD_DELIMITER_DEFAULT);
+ this.lineDelimiter =
executionOptions.getStreamLoadProp().getProperty(LINE_DELIMITER_KEY,LINE_DELIMITER_DEFAULT);
Review comment:
space
##########
File path:
extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java
##########
@@ -133,11 +141,15 @@ public void load(String value) throws StreamLoadException
{
}
private LoadResponse loadBatch(String value) {
- Calendar calendar = Calendar.getInstance();
- String label =
String.format("flink_connector_%s%02d%02d_%02d%02d%02d_%s",
- calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH) + 1,
calendar.get(Calendar.DAY_OF_MONTH),
- calendar.get(Calendar.HOUR_OF_DAY),
calendar.get(Calendar.MINUTE), calendar.get(Calendar.SECOND),
- UUID.randomUUID().toString().replaceAll("-", ""));
+ String label = streamLoadProp.getProperty("label");
+ if(StringUtils.isBlank(label)){
+ Calendar calendar = Calendar.getInstance();
+ label = String.format("flink_connector_%s%02d%02d_%02d%02d%02d_%s",
Review comment:
SimpleDateFormat is easiler to use and read
##########
File path:
extension/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java
##########
@@ -89,6 +94,9 @@ private HttpURLConnection getConnection(String urlStr, String
label) throws IOEx
conn.addRequestProperty("Expect", "100-continue");
conn.addRequestProperty("Content-Type", "text/plain; charset=UTF-8");
conn.addRequestProperty("label", label);
+ for(Map.Entry<Object, Object> entry : streamLoadProp.entrySet()){
+
conn.addRequestProperty(String.valueOf(entry.getKey()),String.valueOf(entry.getValue()));
Review comment:
space
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]