This is an automated email from the ASF dual-hosted git repository.
vernedeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new ebfe3711cb [INLONG-9758][Sort] StarRocks connector support state key
when initializing (#9759)
ebfe3711cb is described below
commit ebfe3711cb120c6a36e448f24b1c89b2cb766c9b
Author: vernedeng <[email protected]>
AuthorDate: Sun Mar 3 11:25:17 2024 +0800
[INLONG-9758][Sort] StarRocks connector support state key when initializing
(#9759)
* [INLONG-9758][Sort] StarRocks connector support state key when
initializing
* fix checkstyle
---
.../sink/table/StarRocksDynamicSinkFunctionV2.java | 19 +++++++++++++++++--
1 file changed, 17 insertions(+), 2 deletions(-)
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicSinkFunctionV2.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicSinkFunctionV2.java
index 01c50b9015..9df5f0e422 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicSinkFunctionV2.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicSinkFunctionV2.java
@@ -55,6 +55,7 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.NestedRowData;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -96,6 +97,7 @@ public class StarRocksDynamicSinkFunctionV2<T> extends
StarRocksDynamicSinkFunct
private String auditHostAndPorts;
private String auditKeys;
private SchemaUtils schemaUtils;
+ private String stateKey;
public StarRocksDynamicSinkFunctionV2(StarRocksSinkOptions sinkOptions,
TableSchema schema,
@@ -120,6 +122,15 @@ public class StarRocksDynamicSinkFunctionV2<T> extends
StarRocksDynamicSinkFunct
sinkOptions.getSemantic() ==
StarRocksSinkSemantic.AT_LEAST_ONCE);
}
+ public StarRocksDynamicSinkFunctionV2(
+ StarRocksSinkOptions sinkOptions,
+ TableSchema schema,
+ StarRocksIRowTransformer<T> rowTransformer, String inlongMetric,
+ String auditHostAndPorts, String auditKeys, String stateKey) {
+ this(sinkOptions, schema, rowTransformer, inlongMetric,
auditHostAndPorts, auditKeys);
+ this.stateKey = stateKey;
+ }
+
@Override
public void invoke(T value, Context context)
throws IOException, ClassNotFoundException, JSQLParserException {
@@ -291,19 +302,23 @@ public class StarRocksDynamicSinkFunctionV2<T> extends
StarRocksDynamicSinkFunct
return;
}
+ String transactionStateName = "starrocks-sink-transaction"
+ + (StringUtils.isNullOrWhitespaceOnly(stateKey) ? "" : "-" +
stateKey);
ListStateDescriptor<byte[]> descriptor =
new ListStateDescriptor<>(
- "starrocks-sink-transaction",
+ transactionStateName,
TypeInformation.of(new TypeHint<byte[]>() {
}));
ListState<byte[]> listState =
functionInitializationContext.getOperatorStateStore().getListState(descriptor);
snapshotStates = new SimpleVersionedListState<>(listState, new
StarRocksVersionedSerializer());
+ String legacyStateName = "buffered-rows"
+ + (StringUtils.isNullOrWhitespaceOnly(stateKey) ? "" : "-" +
stateKey);
// old version
ListStateDescriptor<Map<String, StarRocksSinkBufferEntity>>
legacyDescriptor =
new ListStateDescriptor<>(
- "buffered-rows",
+ legacyStateName,
TypeInformation.of(new TypeHint<Map<String,
StarRocksSinkBufferEntity>>() {
}));
legacyState =
functionInitializationContext.getOperatorStateStore().getListState(legacyDescriptor);