This is an automated email from the ASF dual-hosted git repository.
aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 2f10b5e563 [INLONG-11520][SDK] Remove DirtyServerType, use SinkType
(#11521)
2f10b5e563 is described below
commit 2f10b5e563bddce1f594b32182debc3cddda72b3
Author: vernedeng <[email protected]>
AuthorDate: Sun Nov 24 11:56:44 2024 +0800
[INLONG-11520][SDK] Remove DirtyServerType, use SinkType (#11521)
---
.../apache/inlong/common/constant/SinkType.java | 2 ++
.../inlong/sdk/dirtydata/DirtyMessageWrapper.java | 3 +-
.../apache/inlong/sort/base/dirty/DirtyData.java | 11 +++----
.../sort/base/dirty/sink/DirtyServerType.java | 37 ----------------------
.../base/dirty/sink/sdk/InlongSdkDirtySink.java | 2 +-
.../DynamicTubeMQTableDeserializationSchema.java | 4 +--
6 files changed, 12 insertions(+), 47 deletions(-)
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/constant/SinkType.java
b/inlong-common/src/main/java/org/apache/inlong/common/constant/SinkType.java
index c2816baf1e..f3eb862d87 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/constant/SinkType.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/constant/SinkType.java
@@ -19,6 +19,8 @@ package org.apache.inlong.common.constant;
public class SinkType {
+ public static final String ICEBERG = "ICEBERG";
+ public static final String HIVE = "HIVE";
public static final String KAFKA = "KAFKA";
public static final String PULSAR = "PULSAR";
public static final String CLS = "CLS";
diff --git
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyMessageWrapper.java
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyMessageWrapper.java
index a82d574cac..a22c4249b0 100644
---
a/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyMessageWrapper.java
+++
b/inlong-sdk/dirty-data-sdk/src/main/java/org/apache/inlong/sdk/dirtydata/DirtyMessageWrapper.java
@@ -34,7 +34,8 @@ import java.util.StringJoiner;
public class DirtyMessageWrapper {
private static DateTimeFormatter dateTimeFormatter =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
- private String delimiter;
+ @Builder.Default
+ private String delimiter = "|";
@Builder.Default
@Getter
private int retryTimes = 0;
diff --git
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyData.java
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyData.java
index 24c5dddecd..7670879925 100644
---
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyData.java
+++
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyData.java
@@ -17,7 +17,6 @@
package org.apache.inlong.sort.base.dirty;
-import org.apache.inlong.sort.base.dirty.sink.DirtyServerType;
import org.apache.inlong.sort.base.util.PatternReplaceUtils;
import org.apache.flink.table.types.logical.LogicalType;
@@ -65,7 +64,7 @@ public class DirtyData<T> {
*/
private final DirtyType dirtyType;
- private final DirtyServerType serverType;
+ private final String serverType;
/**
* Dirty describe message, it is the cause of dirty data
*/
@@ -88,7 +87,7 @@ public class DirtyData<T> {
private final T data;
public DirtyData(T data, String identifier, String labels,
- String logTag, DirtyType dirtyType, DirtyServerType serverType,
String dirtyMessage,
+ String logTag, DirtyType dirtyType, String serverType, String
dirtyMessage,
@Nullable LogicalType rowType, long dataTime, String extParams) {
this.data = data;
this.dirtyType = dirtyType;
@@ -131,7 +130,7 @@ public class DirtyData<T> {
return dirtyType;
}
- public DirtyServerType getServerType() {
+ public String getServerType() {
return serverType;
}
@@ -162,7 +161,7 @@ public class DirtyData<T> {
private String labels;
private String logTag;
private DirtyType dirtyType = DirtyType.UNDEFINED;
- private DirtyServerType serverType = DirtyServerType.UNDEFINED;
+ private String serverType;
private String dirtyMessage;
private LogicalType rowType;
private long dataTime;
@@ -184,7 +183,7 @@ public class DirtyData<T> {
return this;
}
- public Builder<T> setServerType(DirtyServerType serverType) {
+ public Builder<T> setServerType(String serverType) {
this.serverType = serverType;
return this;
}
diff --git
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/DirtyServerType.java
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/DirtyServerType.java
deleted file mode 100644
index 63f993c146..0000000000
---
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/DirtyServerType.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.base.dirty.sink;
-
-public enum DirtyServerType {
-
- UNDEFINED("Undefined"),
- TUBE_MQ("TubeMQ"),
- ICEBERG("Iceberg")
-
- ;
-
- private final String format;
-
- DirtyServerType(String format) {
- this.format = format;
- }
-
- public String format() {
- return format;
- }
-}
diff --git
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java
index 8513f841bc..8e692a4c10 100644
---
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java
+++
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/sdk/InlongSdkDirtySink.java
@@ -67,7 +67,7 @@ public class InlongSdkDirtySink<T> implements DirtySink<T> {
.inlongStreamId(dataStreamId)
.dataflowId(dataflowId)
.dataTime(dirtyData.getDataTime())
- .serverType(dirtyData.getServerType().format())
+ .serverType(dirtyData.getServerType())
.dirtyType(dirtyData.getDirtyType().format())
.dirtyMessage(dirtyData.getDirtyMessage())
.ext(dirtyData.getExtParams())
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java
index 94631e7cd3..2749213096 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java
@@ -17,10 +17,10 @@
package org.apache.inlong.sort.tubemq.table;
+import org.apache.inlong.common.constant.MQType;
import org.apache.inlong.sort.base.dirty.DirtyData;
import org.apache.inlong.sort.base.dirty.DirtyOptions;
import org.apache.inlong.sort.base.dirty.DirtyType;
-import org.apache.inlong.sort.base.dirty.sink.DirtyServerType;
import org.apache.inlong.sort.base.dirty.sink.DirtySink;
import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.MetricsCollector;
@@ -144,7 +144,7 @@ public class DynamicTubeMQTableDeserializationSchema
implements DynamicTubeMQDes
builder.setData(message.getData())
.setDirtyType(DirtyType.KEY_DESERIALIZE_ERROR)
- .setServerType(DirtyServerType.TUBE_MQ)
+ .setServerType(MQType.TUBEMQ)
.setDirtyDataTime(dataTime)
.setExtParams(message.getAttribute())
.setLabels(dirtyOptions.getLabels())