e-mhui commented on code in PR #6541:
URL: https://github.com/apache/inlong/pull/6541#discussion_r1031010249
##########
inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java:
##########
@@ -144,8 +194,49 @@ public static DorisDynamicSchemaOutputFormat.Builder
builder() {
return new DorisDynamicSchemaOutputFormat.Builder();
}
+ private String parseKeysType() {
+ try {
+ Schema schema = RestService.getSchema(options, readOptions, LOG);
+ return schema.getKeysType();
+ } catch (DorisException e) {
+ throw new RuntimeException("Failed fetch doris table schema: " +
options.getTableIdentifier());
+ }
+ }
+
+ private void handleStreamLoadProp() {
+ Properties props = executionOptions.getStreamLoadProp();
+ boolean ifEscape =
Boolean.parseBoolean(props.getProperty(ESCAPE_DELIMITERS_KEY,
ESCAPE_DELIMITERS_DEFAULT));
+ this.fieldDelimiter = props.getProperty(FIELD_DELIMITER_KEY,
FIELD_DELIMITER_DEFAULT);
+ this.lineDelimiter = props.getProperty(LINE_DELIMITER_KEY,
LINE_DELIMITER_DEFAULT);
+ if (ifEscape) {
+ this.fieldDelimiter = DorisParseUtils.escapeString(fieldDelimiter);
+ this.lineDelimiter = DorisParseUtils.escapeString(lineDelimiter);
+ props.remove(ESCAPE_DELIMITERS_KEY);
+ }
+
+ //add column key when fieldNames is not empty
+ if (!props.containsKey(COLUMNS_KEY) && fieldNames != null &&
fieldNames.length > 0) {
+ String columns = String.join(",", Arrays.stream(fieldNames)
+ .map(item -> String.format("`%s`",
item.trim().replace("`", "")))
+ .collect(Collectors.toList()));
+ props.put(COLUMNS_KEY, columns);
+ }
+
+ // if enable batch delete, the columns must add tag
'__DORIS_DELETE_SIGN__'
+ String columns = (String) props.get(COLUMNS_KEY);
+ if (!columns.contains(DORIS_DELETE_SIGN)
Review Comment:
Write it in one line here.
##########
inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java:
##########
@@ -159,8 +257,28 @@ public void open(int taskNumber, int numTasks) throws
IOException {
options.getUsername(),
options.getPassword(),
executionOptions.getStreamLoadProp());
- jsonDynamicSchemaFormat = (JsonDynamicSchemaFormat)
- DynamicSchemaFormatFactory.getFormat(dynamicSchemaFormat);
+ if (!multipleSink) {
+ Properties loadProperties = executionOptions.getStreamLoadProp();
Review Comment:
It will be better to place the columns processing code the
`handleStreamloadProp()`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]