This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/release-1.3.0 by this push:
new 3a18de731 [INLONG-6016][Sort] Serialization failed when submit load
DLC sort job (#6017)
3a18de731 is described below
commit 3a18de73190b052ea8c9ac72e52f078313677135
Author: thesumery <[email protected]>
AuthorDate: Mon Sep 26 16:41:20 2022 +0800
[INLONG-6016][Sort] Serialization failed when submit load DLC sort job
(#6017)
Co-authored-by: thesumery <[email protected]>
---
.../org/apache/inlong/sort/base/metric/MetricOption.java | 12 +++++++-----
1 file changed, 7 insertions(+), 5 deletions(-)
diff --git
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java
index 8cf0d6f01..d6304f1ba 100644
---
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java
+++
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java
@@ -22,6 +22,7 @@ import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import javax.annotation.Nullable;
+import java.io.Serializable;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
@@ -33,7 +34,8 @@ import static org.apache.inlong.sort.base.Constants.DELIMITER;
import static org.apache.inlong.sort.base.Constants.GROUP_ID;
import static org.apache.inlong.sort.base.Constants.STREAM_ID;
-public class MetricOption {
+public class MetricOption implements Serializable {
+ private static final long serialVersionUID = 1L;
private static final String IP_OR_HOST_PORT =
"^(.*):([0-9]|[1-9]\\d|[1-9]\\d{"
+ "2}|[1-9]\\d{"
+ "3}|[1-5]\\d{"
@@ -43,7 +45,7 @@ public class MetricOption {
private Map<String, String> labels;
private final HashSet<String> ipPortList;
- private Optional<String> ipPorts;
+ private String ipPorts;
private RegisteredMetric registeredMetric;
private long initRecords;
private long initBytes;
@@ -70,8 +72,8 @@ public class MetricOption {
});
this.ipPortList = new HashSet<>();
- this.ipPorts = Optional.ofNullable(inlongAudit);
- if (ipPorts.isPresent()) {
+ this.ipPorts = inlongAudit;
+ if (ipPorts != null) {
Preconditions.checkArgument(labels.containsKey(GROUP_ID) &&
labels.containsKey(STREAM_ID),
"groupId and streamId must be set when enable inlong audit
collect.");
String[] ipPortStrs = inlongAudit.split(DELIMITER);
@@ -96,7 +98,7 @@ public class MetricOption {
}
public Optional<String> getIpPorts() {
- return ipPorts;
+ return Optional.ofNullable(ipPorts);
}
public RegisteredMetric getRegisteredMetric() {