This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 57c04c8f5 [INLONG-7392][Sort] Refactor Doris single table to solve
performance issues (#7439)
57c04c8f5 is described below
commit 57c04c8f5f9bb26206bb1be2fbd61a9e9dd307b2
Author: Yizhou Yang <[email protected]>
AuthorDate: Wed Mar 1 14:24:13 2023 +0800
[INLONG-7392][Sort] Refactor Doris single table to solve performance issues
(#7439)
---
.../table/DorisDynamicSchemaOutputFormat.java | 33 +++++------------
.../inlong/sort/doris/util/DorisParseUtils.java | 41 ++++++++++++++++++++++
2 files changed, 49 insertions(+), 25 deletions(-)
diff --git
a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
index 84f573334..ebc8cfca2 100644
---
a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
+++
b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
@@ -149,6 +149,7 @@ public class DorisDynamicSchemaOutputFormat<T> extends
RichOutputFormat<T> {
private final LogicalType[] logicalTypes;
private final DirtyOptions dirtyOptions;
private @Nullable final DirtySink<Object> dirtySink;
+ private transient Schema schema;
public DorisDynamicSchemaOutputFormat(DorisOptions option,
DorisReadOptions readOptions,
@@ -191,15 +192,6 @@ public class DorisDynamicSchemaOutputFormat<T> extends
RichOutputFormat<T> {
return new DorisDynamicSchemaOutputFormat.Builder();
}
- private String parseKeysType() {
- try {
- Schema schema = RestService.getSchema(options, readOptions, LOG);
- return schema.getKeysType();
- } catch (DorisException e) {
- throw new RuntimeException("Failed fetch doris table schema: " +
options.getTableIdentifier());
- }
- }
-
private void handleStreamLoadProp() {
Properties props = executionOptions.getStreamLoadProp();
boolean ifEscape =
Boolean.parseBoolean(props.getProperty(ESCAPE_DELIMITERS_KEY,
ESCAPE_DELIMITERS_DEFAULT));
@@ -230,12 +222,7 @@ public class DorisDynamicSchemaOutputFormat<T> extends
RichOutputFormat<T> {
if (multipleSink) {
return executionOptions.getEnableDelete();
}
- try {
- Schema schema = RestService.getSchema(options, readOptions, LOG);
- return executionOptions.getEnableDelete() ||
UNIQUE_KEYS_TYPE.equals(schema.getKeysType());
- } catch (DorisException e) {
- throw new RuntimeException("Failed fetch doris single table
schema: " + options.getTableIdentifier(), e);
- }
+ return executionOptions.getEnableDelete() ||
UNIQUE_KEYS_TYPE.equals(schema.getKeysType());
}
@Override
@@ -251,16 +238,12 @@ public class DorisDynamicSchemaOutputFormat<T> extends
RichOutputFormat<T> {
handleStreamLoadProp();
this.fieldGetters = new RowData.FieldGetter[logicalTypes.length];
for (int i = 0; i < logicalTypes.length; i++) {
- fieldGetters[i] = RowData.createFieldGetter(logicalTypes[i],
i);
- if ("DATE".equalsIgnoreCase(logicalTypes[i].toString())) {
- int finalI = i;
- fieldGetters[i] = row -> {
- if (row.isNullAt(finalI)) {
- return null;
- }
- return DorisParseUtils.epochToDate(row.getInt(finalI));
- };
- }
+ fieldGetters[i] =
DorisParseUtils.createFieldGetter(logicalTypes[i], i);
+ }
+ try {
+ schema = RestService.getSchema(options, readOptions, LOG);
+ } catch (DorisException e) {
+ throw new RuntimeException(e);
}
}
diff --git
a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/util/DorisParseUtils.java
b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/util/DorisParseUtils.java
index d5cc32169..6428df94b 100644
---
a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/util/DorisParseUtils.java
+++
b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/util/DorisParseUtils.java
@@ -17,6 +17,9 @@
package org.apache.inlong.sort.doris.util;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.RowData.FieldGetter;
+import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.types.RowKind;
import java.time.LocalDate;
@@ -49,6 +52,28 @@ public class DorisParseUtils {
}
}
+ /**
+ * A utility function used to handle special fieldGetters for specific
+ *
+ * @param type the logical type of the field getter array
+ * @param pos the index of the corresponding row
+ * @return the fieldGetter created
+ */
+ public static FieldGetter createFieldGetter(LogicalType type, int pos) {
+ FieldGetter getter;
+ if (type.toString().equalsIgnoreCase(LogicalTypeEnum.DATE.getType())) {
+ getter = row -> {
+ if (row.isNullAt(pos)) {
+ return null;
+ }
+ return DorisParseUtils.epochToDate(row.getInt(pos));
+ };
+ } else {
+ getter = RowData.createFieldGetter(type, pos);
+ }
+ return getter;
+ }
+
/**
* A utility used to parse a string according to the given hexadecimal
escape sequence.
* <p/>
@@ -88,4 +113,20 @@ public class DorisParseUtils {
throw new IllegalArgumentException(
"Convert to LocalDate failed from unexpected value '" + obj +
"' of type " + obj.getClass().getName());
}
+
+ private enum LogicalTypeEnum {
+
+ DATE("DATE");
+
+ private final String logicalType;
+
+ LogicalTypeEnum(String logicalType) {
+ this.logicalType = logicalType;
+ }
+
+ public String getType() {
+ return logicalType;
+ }
+ }
+
}