This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 89e2fcad95 [INLONG-8839][Sort] Add audit metric in starrocks connector 
on flink 1.15 (#8894)
89e2fcad95 is described below

commit 89e2fcad959fb8e50904d0e6310d9de12baa1263
Author: Sting <[email protected]>
AuthorDate: Mon Sep 18 11:09:40 2023 +0800

    [INLONG-8839][Sort] Add audit metric in starrocks connector on flink 1.15 
(#8894)
---
 .../src/test/resources/flinkSql/postgres_test.sql  |   6 +-
 .../inlong/sort/base/metric/SinkMetricData.java    |  42 ++-
 .../sort-connectors/starrocks/pom.xml              |   6 +
 .../sink/StarRocksDynamicTableSinkFactory.java     |  13 +-
 .../table/sink/table/SinkFunctionFactory.java      | 169 ++++++++++
 .../sink/table/StarRocksDynamicSinkFunctionV2.java | 374 +++++++++++++++++++++
 .../sink/table/StarRocksDynamicTableSink.java      |  81 +++++
 .../starrocks/table/sink/utils/SchemaUtils.java    |  93 +++++
 licenses/inlong-sort-connectors/LICENSE            |   5 +-
 9 files changed, 771 insertions(+), 18 deletions(-)

diff --git 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/postgres_test.sql
 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/postgres_test.sql
index 64bfe3f606..7b9957a0fa 100644
--- 
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/postgres_test.sql
+++ 
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/postgres_test.sql
@@ -28,9 +28,9 @@ CREATE TABLE test_output1 (
     'table-name' = 'test_output1',
     'username' = 'inlong',
     'password' = 'inlong',
-    'sink.buffer-flush.interval-ms' = '5000',
-    'sink.properties.column_separator' = '\x01',
-    'sink.properties.row_delimiter' = '\x02'
+    'sink.properties.format' = 'json',
+    'sink.properties.strip_outer_array' = 'true',
+    'sink.buffer-flush.interval-ms' = '1000'
 );
 
 INSERT INTO test_output1 select * from test_input1;
diff --git 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
index b8867ad847..fa2aee68d3 100644
--- 
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
+++ 
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
@@ -18,7 +18,6 @@
 package org.apache.inlong.sort.base.metric;
 
 import org.apache.inlong.audit.AuditOperator;
-import org.apache.inlong.sort.base.Constants;
 import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
 
 import org.apache.flink.metrics.Counter;
@@ -26,6 +25,7 @@ import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.SimpleCounter;
 
+import java.util.List;
 import java.util.Map;
 
 import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES_OUT;
@@ -55,6 +55,7 @@ public class SinkMetricData implements MetricData {
     private Counter dirtyBytesOut;
     private Meter numRecordsOutPerSecond;
     private Meter numBytesOutPerSecond;
+    private List<Integer> auditKeys;
 
     public SinkMetricData(MetricOption option, MetricGroup metricGroup) {
         this.metricGroup = metricGroup;
@@ -100,6 +101,7 @@ public class SinkMetricData implements MetricData {
         if (option.getIpPorts().isPresent()) {
             AuditOperator.getInstance().setAuditProxy(option.getIpPortList());
             this.auditOperator = AuditOperator.getInstance();
+            this.auditKeys = option.getInlongAuditKeys();
         }
     }
 
@@ -257,11 +259,39 @@ public class SinkMetricData implements MetricData {
         invoke(1, getDataSize(o));
     }
 
+    public void invokeWithEstimate(Object o, long dataTime) {
+        invoke(1, getDataSize(o), dataTime);
+    }
+
     public void invokeDirtyWithEstimate(Object o) {
         invokeDirty(1, getDataSize(o));
     }
 
     public void invoke(long rowCount, long rowSize) {
+        outputDefaultMetrics(rowCount, rowSize);
+        outputAuditMetrics(rowCount, rowSize, System.currentTimeMillis());
+    }
+
+    private void invoke(long rowCount, long rowSize, long dataTime) {
+        outputDefaultMetrics(rowCount, rowSize);
+        outputAuditMetrics(rowCount, rowSize, dataTime);
+    }
+
+    private void outputAuditMetrics(long rowCount, long rowSize, long 
dataTime) {
+        if (auditOperator != null) {
+            for (Integer key : auditKeys) {
+                auditOperator.add(
+                        key,
+                        getGroupId(),
+                        getStreamId(),
+                        dataTime,
+                        rowCount,
+                        rowSize);
+            }
+        }
+    }
+
+    private void outputDefaultMetrics(long rowCount, long rowSize) {
         if (numRecordsOut != null) {
             numRecordsOut.inc(rowCount);
         }
@@ -277,16 +307,6 @@ public class SinkMetricData implements MetricData {
         if (numBytesOutForMeter != null) {
             numBytesOutForMeter.inc(rowSize);
         }
-
-        if (auditOperator != null) {
-            auditOperator.add(
-                    Constants.AUDIT_SORT_OUTPUT,
-                    getGroupId(),
-                    getStreamId(),
-                    System.currentTimeMillis(),
-                    rowCount,
-                    rowSize);
-        }
     }
 
     public void invokeDirty(long rowCount, long rowSize) {
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/pom.xml 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/pom.xml
index 9fdda27057..c3412e0b00 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/pom.xml
@@ -45,6 +45,10 @@
             <artifactId>flink-connector-starrocks</artifactId>
             
<version>${starrocks-connector.version}_flink-${flink.starrocks.version}</version>
         </dependency>
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>fastjson</artifactId>
+        </dependency>
     </dependencies>
 
     <build>
@@ -75,6 +79,8 @@
                                     
<include>org.apache.flink:flink-shaded-guava</include>
                                     <include>com.google.protobuf:*</include>
                                     
<include>com.starrocks:flink-connector-starrocks</include>
+                                    <include>com.alibaba:*</include>
+                                    
<include>com.github.jsqlparser:jsqlparser</include>
                                 </includes>
                             </artifactSet>
 
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicTableSinkFactory.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicTableSinkFactory.java
index 1f31cf507b..de764e7041 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicTableSinkFactory.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicTableSinkFactory.java
@@ -17,7 +17,8 @@
 
 package org.apache.inlong.sort.starrocks.table.sink;
 
-import com.starrocks.connector.flink.table.sink.StarRocksDynamicTableSink;
+import 
org.apache.inlong.sort.starrocks.table.sink.table.StarRocksDynamicTableSink;
+
 import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ReadableConfig;
@@ -40,6 +41,9 @@ import static 
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT;
 import static 
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_SCHEMA_UPDATE_POLICY;
 import static 
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TABLE_PATTERN;
 
+/**
+ * Factory to create StarRocksDynamicTableSink.
+ */
 public class StarRocksDynamicTableSinkFactory implements 
DynamicTableSinkFactory {
 
     @Override
@@ -47,13 +51,17 @@ public class StarRocksDynamicTableSinkFactory implements 
DynamicTableSinkFactory
         final FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
         helper.validateExcept(StarRocksSinkOptions.SINK_PROPERTIES_PREFIX, 
DIRTY_PREFIX);
         ReadableConfig options = helper.getOptions();
+        String inlongMetric = 
options.getOptional(INLONG_METRIC).orElse(INLONG_METRIC.defaultValue());
+        String auditHostAndPorts = 
options.getOptional(INLONG_AUDIT).orElse(INLONG_AUDIT.defaultValue());
+        String auditKeys = 
options.getOptional(AUDIT_KEYS).orElse(AUDIT_KEYS.defaultValue());
+
         // validate some special properties
         StarRocksSinkOptions sinkOptions = new StarRocksSinkOptions(options, 
context.getCatalogTable().getOptions());
         sinkOptions.enableUpsertDelete();
         TableSchema physicalSchema = 
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
 
         return new StarRocksDynamicTableSink(sinkOptions,
-                physicalSchema);
+                physicalSchema, inlongMetric, auditHostAndPorts, auditKeys);
     }
 
     @Override
@@ -93,7 +101,6 @@ public class StarRocksDynamicTableSinkFactory implements 
DynamicTableSinkFactory
         optionalOptions.add(INLONG_METRIC);
         optionalOptions.add(INLONG_AUDIT);
         optionalOptions.add(AUDIT_KEYS);
-
         return optionalOptions;
     }
 
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/SinkFunctionFactory.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/SinkFunctionFactory.java
new file mode 100644
index 0000000000..2b7571d4cd
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/SinkFunctionFactory.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.starrocks.table.sink.table;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.starrocks.connector.flink.row.sink.StarRocksIRowTransformer;
+import com.starrocks.connector.flink.table.sink.StarRocksDynamicSinkFunction;
+import 
com.starrocks.connector.flink.table.sink.StarRocksDynamicSinkFunctionBase;
+import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
+import com.starrocks.connector.flink.table.sink.StarRocksSinkSemantic;
+import com.starrocks.connector.flink.tools.ConnectionUtils;
+import com.starrocks.data.load.stream.StreamLoadConstants;
+import com.starrocks.data.load.stream.StreamLoadUtils;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.http.HttpHeaders;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.DefaultRedirectStrategy;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/** Create sink function according to the configuration. */
+public class SinkFunctionFactory {
+
+    private static final Logger LOG = LoggerFactory.getLogger(
+            SinkFunctionFactory.class);
+
+    enum SinkVersion {
+        // Implement exactly-once using stream load which has a
+        // poor performance. All versions of StarRocks are supported
+        V1,
+        // Implement exactly-once using transaction load since StarRocks 2.4
+        V2,
+        // Select sink version automatically according to whether StarRocks
+        // supports transaction load
+        AUTO
+    }
+
+    public static boolean 
isStarRocksSupportTransactionLoad(StarRocksSinkOptions sinkOptions) {
+        String host = ConnectionUtils.selectAvailableHttpHost(
+                sinkOptions.getLoadUrlList(), sinkOptions.getConnectTimeout());
+        if (host == null) {
+            throw new RuntimeException("Can't find an available host in " + 
sinkOptions.getLoadUrlList());
+        }
+
+        String beginUrlStr = StreamLoadConstants.getBeginUrl(host);
+        HttpPost httpPost = new HttpPost(beginUrlStr);
+        httpPost.addHeader(HttpHeaders.AUTHORIZATION,
+                StreamLoadUtils.getBasicAuthHeader(sinkOptions.getUsername(), 
sinkOptions.getPassword()));
+        
httpPost.setConfig(RequestConfig.custom().setExpectContinueEnabled(true).setRedirectsEnabled(true).build());
+        LOG.info("Transaction load probe post {}", httpPost);
+
+        HttpClientBuilder clientBuilder = HttpClients.custom()
+                .setRedirectStrategy(new DefaultRedirectStrategy() {
+
+                    @Override
+                    protected boolean isRedirectable(String method) {
+                        return true;
+                    }
+                });
+
+        try (CloseableHttpClient client = clientBuilder.build()) {
+            CloseableHttpResponse response = client.execute(httpPost);
+            String responseBody = EntityUtils.toString(response.getEntity());
+            LOG.info("Transaction load probe response {}", responseBody);
+
+            JSONObject bodyJson = JSON.parseObject(responseBody);
+            String status = bodyJson.getString("status");
+            String msg = bodyJson.getString("msg");
+
+            // If StarRocks does not support transaction load, FE's 
NotFoundAction#executePost
+            // will be called where you can know how the response json is 
constructed
+            if ("FAILED".equals(status) && "Not implemented".equals(msg)) {
+                return false;
+            }
+            return true;
+        } catch (IOException e) {
+            String errMsg = "Failed to probe transaction load for " + host;
+            LOG.warn("{}", errMsg, e);
+            throw new RuntimeException(errMsg, e);
+        }
+    }
+
+    public static void detectStarRocksFeature(StarRocksSinkOptions 
sinkOptions) {
+        try {
+            boolean supportTransactionLoad = 
isStarRocksSupportTransactionLoad(sinkOptions);
+            
sinkOptions.setSupportTransactionStreamLoad(supportTransactionLoad);
+            if (supportTransactionLoad) {
+                LOG.info("StarRocks supports transaction load");
+            } else {
+                LOG.info("StarRocks does not support transaction load");
+            }
+        } catch (Exception e) {
+            LOG.warn("Can't decide whether StarRocks supports transaction 
load, and enable it by default.");
+            sinkOptions.setSupportTransactionStreamLoad(true);
+        }
+    }
+
+    public static SinkFunctionFactory.SinkVersion 
chooseSinkVersionAutomatically(StarRocksSinkOptions sinkOptions) {
+        if 
(StarRocksSinkSemantic.AT_LEAST_ONCE.equals(sinkOptions.getSemantic())) {
+            LOG.info("Choose sink version V2 for at-least-once.");
+            return SinkVersion.V2;
+        }
+
+        if (sinkOptions.isSupportTransactionStreamLoad()) {
+            LOG.info("StarRocks supports transaction load, and choose sink 
version V2");
+            return SinkVersion.V2;
+        } else {
+            LOG.info("StarRocks does not support transaction load, and choose 
sink version V1");
+            return SinkVersion.V1;
+        }
+    }
+
+    public static SinkVersion getSinkVersion(StarRocksSinkOptions sinkOptions) 
{
+        String sinkTypeOption = 
sinkOptions.getSinkVersion().trim().toUpperCase();
+        SinkVersion sinkVersion;
+        if (SinkVersion.V1.name().equals(sinkTypeOption)) {
+            sinkVersion = SinkVersion.V1;
+        } else if (SinkVersion.V2.name().equals(sinkTypeOption)) {
+            sinkVersion = SinkVersion.V2;
+        } else if (SinkVersion.AUTO.name().equals(sinkTypeOption)) {
+            sinkVersion = chooseSinkVersionAutomatically(sinkOptions);
+        } else {
+            throw new UnsupportedOperationException("Unsupported sink type " + 
sinkTypeOption);
+        }
+        LOG.info("Choose sink version {}", sinkVersion.name());
+        return sinkVersion;
+    }
+
+    public static <T> StarRocksDynamicSinkFunctionBase<T> createSinkFunction(
+            StarRocksSinkOptions sinkOptions, TableSchema schema, 
StarRocksIRowTransformer<T> rowTransformer,
+            String inlongMetric, String auditHostAndPorts, String auditKeys) {
+        detectStarRocksFeature(sinkOptions);
+        SinkVersion sinkVersion = getSinkVersion(sinkOptions);
+        switch (sinkVersion) {
+            case V1:
+                return new StarRocksDynamicSinkFunction<>(sinkOptions, schema, 
rowTransformer);
+            case V2:
+                return new StarRocksDynamicSinkFunctionV2<>(sinkOptions, 
schema, rowTransformer,
+                        inlongMetric, auditHostAndPorts, auditKeys);
+            default:
+                throw new UnsupportedOperationException("Unsupported sink type 
" + sinkVersion.name());
+        }
+    }
+
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicSinkFunctionV2.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicSinkFunctionV2.java
new file mode 100644
index 0000000000..70326a67a3
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicSinkFunctionV2.java
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.starrocks.table.sink.table;
+
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.starrocks.table.sink.utils.SchemaUtils;
+
+import com.google.common.base.Strings;
+import com.starrocks.connector.flink.manager.StarRocksSinkBufferEntity;
+import com.starrocks.connector.flink.manager.StarRocksSinkManagerV2;
+import com.starrocks.connector.flink.manager.StarRocksSinkTable;
+import com.starrocks.connector.flink.row.sink.StarRocksIRowTransformer;
+import com.starrocks.connector.flink.row.sink.StarRocksISerializer;
+import com.starrocks.connector.flink.row.sink.StarRocksSerializerFactory;
+import com.starrocks.connector.flink.table.data.StarRocksRowData;
+import 
com.starrocks.connector.flink.table.sink.StarRocksDynamicSinkFunctionBase;
+import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
+import com.starrocks.connector.flink.table.sink.StarRocksSinkRowDataWithMeta;
+import com.starrocks.connector.flink.table.sink.StarRocksSinkSemantic;
+import com.starrocks.connector.flink.table.sink.StarRocksVersionedSerializer;
+import com.starrocks.connector.flink.table.sink.StarrocksSnapshotState;
+import com.starrocks.connector.flink.tools.EnvUtils;
+import com.starrocks.data.load.stream.StreamLoadSnapshot;
+import net.sf.jsqlparser.JSQLParserException;
+import net.sf.jsqlparser.parser.CCJSqlParserUtil;
+import net.sf.jsqlparser.statement.Statement;
+import net.sf.jsqlparser.statement.alter.Alter;
+import net.sf.jsqlparser.statement.truncate.Truncate;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.NestedRowData;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.InstantiationUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+/**
+ * StarRocks dynamic sink function. It supports insert, upsert, delete in 
Starrocks.
+ * @param <T>
+ */
+public class StarRocksDynamicSinkFunctionV2<T> extends 
StarRocksDynamicSinkFunctionBase<T> {
+
+    private static final long serialVersionUID = 1L;
+    private static final Logger log = 
LoggerFactory.getLogger(StarRocksDynamicSinkFunctionV2.class);
+
+    private final StarRocksSinkOptions sinkOptions;
+    private final StarRocksSinkManagerV2 sinkManager;
+    private final StarRocksISerializer serializer;
+    private final StarRocksIRowTransformer<T> rowTransformer;
+    private static final int NESTED_ROW_DATA_HEADER_SIZE = 256;
+
+    private transient volatile ListState<StarrocksSnapshotState> 
snapshotStates;
+    private final Map<Long, List<StreamLoadSnapshot>> snapshotMap = new 
ConcurrentHashMap<>();
+    private transient SinkMetricData sinkMetricData;
+
+    @Deprecated
+    private transient ListState<Map<String, StarRocksSinkBufferEntity>> 
legacyState;
+    @Deprecated
+    private transient List<StarRocksSinkBufferEntity> legacyData;
+    private String inlongMetric;
+    private String auditHostAndPorts;
+    private String auditKeys;
+    private SchemaUtils schemaUtils;
+
+    public StarRocksDynamicSinkFunctionV2(StarRocksSinkOptions sinkOptions,
+            TableSchema schema,
+            StarRocksIRowTransformer<T> rowTransformer, String inlongMetric,
+            String auditHostAndPorts, String auditKeys) {
+        this.sinkOptions = sinkOptions;
+        this.rowTransformer = rowTransformer;
+        this.auditHostAndPorts = auditHostAndPorts;
+        this.inlongMetric = inlongMetric;
+        this.auditKeys = auditKeys;
+        StarRocksSinkTable sinkTable = StarRocksSinkTable.builder()
+                .sinkOptions(sinkOptions)
+                .build();
+        this.schemaUtils = new SchemaUtils(schema);
+        // StarRocksJsonSerializer depends on SinkOptions#supportUpsertDelete 
which is decided in
+        // StarRocksSinkTable#validateTableStructure, so create serializer 
after validating table structure
+        this.serializer = 
StarRocksSerializerFactory.createSerializer(sinkOptions,
+                schemaUtils.filterOutTimeField(schema));
+        rowTransformer.setStarRocksColumns(sinkTable.getFieldMapping());
+        rowTransformer.setTableSchema(schema);
+        this.sinkManager = new 
StarRocksSinkManagerV2(sinkOptions.getProperties(),
+                sinkOptions.getSemantic() == 
StarRocksSinkSemantic.AT_LEAST_ONCE);
+    }
+
+    @Override
+    public void invoke(T value, Context context)
+            throws IOException, ClassNotFoundException, JSQLParserException {
+
+        if (serializer == null) {
+            if (value instanceof StarRocksSinkRowDataWithMeta) {
+                StarRocksSinkRowDataWithMeta data = 
(StarRocksSinkRowDataWithMeta) value;
+                if (Strings.isNullOrEmpty(data.getDatabase())
+                        || Strings.isNullOrEmpty(data.getTable())
+                        || data.getDataRows() == null) {
+                    log.warn(String.format("json row data not fulfilled. 
{database: %s, table: %s, dataRows: %s}",
+                            data.getDatabase(), data.getTable(), 
Arrays.toString(data.getDataRows())));
+                    return;
+                }
+                sinkManager.write(null, data.getDatabase(), data.getTable(), 
data.getDataRows());
+                return;
+            } else if (value instanceof StarRocksRowData) {
+                StarRocksRowData data = (StarRocksRowData) value;
+                if (Strings.isNullOrEmpty(data.getDatabase())
+                        || Strings.isNullOrEmpty(data.getTable())
+                        || data.getRow() == null) {
+                    log.warn(String.format("json row data not fulfilled. 
{database: %s, table: %s, dataRows: %s}",
+                            data.getDatabase(), data.getTable(), 
data.getRow()));
+                    return;
+                }
+                sinkManager.write(data.getUniqueKey(), data.getDatabase(), 
data.getTable(), data.getRow());
+                return;
+            }
+            // raw data sink
+            sinkManager.write(null, sinkOptions.getDatabaseName(), 
sinkOptions.getTableName(), value.toString());
+            return;
+        }
+
+        if (value instanceof NestedRowData) {
+            NestedRowData ddlData = (NestedRowData) value;
+            if (ddlData.getSegments().length != 1 || 
ddlData.getSegments()[0].size() < NESTED_ROW_DATA_HEADER_SIZE) {
+                return;
+            }
+
+            int totalSize = ddlData.getSegments()[0].size();
+            byte[] data = new byte[totalSize - NESTED_ROW_DATA_HEADER_SIZE];
+            ddlData.getSegments()[0].get(NESTED_ROW_DATA_HEADER_SIZE, data);
+            Map<String, String> ddlMap = 
InstantiationUtil.deserializeObject(data, HashMap.class.getClassLoader());
+            if (ddlMap == null
+                    || "true".equals(ddlMap.get("snapshot"))
+                    || Strings.isNullOrEmpty(ddlMap.get("ddl"))
+                    || Strings.isNullOrEmpty(ddlMap.get("databaseName"))) {
+                return;
+            }
+            Statement statement = CCJSqlParserUtil.parse(ddlMap.get("ddl"));
+            if (statement instanceof Truncate) {
+                Truncate truncate = (Truncate) statement;
+                if 
(!sinkOptions.getTableName().equalsIgnoreCase(truncate.getTable().getName())) {
+                    return;
+                }
+                // TODO: add ddl to queue
+            } else if (statement instanceof Alter) {
+
+            }
+        }
+        if (value instanceof RowData) {
+            if (RowKind.UPDATE_BEFORE.equals(((RowData) value).getRowKind())) {
+                // do not need update_before, cause an update action happened 
on the primary keys will be separated into
+                // `delete` and `create`
+                return;
+            }
+            if (!sinkOptions.supportUpsertDelete() && 
RowKind.DELETE.equals(((RowData) value).getRowKind())) {
+                // let go the UPDATE_AFTER and INSERT rows for tables who have 
a group of `unique` or `duplicate` keys.
+                return;
+            }
+        }
+
+        flushLegacyData();
+
+        Object[] data = rowTransformer.transform(value, 
sinkOptions.supportUpsertDelete());
+
+        ouputMetrics(value, data);
+
+        sinkManager.write(
+                null,
+                sinkOptions.getDatabaseName(),
+                sinkOptions.getTableName(),
+                serializer.serialize(schemaUtils.filterOutTimeField(data)));
+
+    }
+
+    private void ouputMetrics(T value, Object[] data) {
+        if (sinkMetricData != null) {
+            sinkMetricData.invokeWithEstimate(value, 
schemaUtils.getDataTime(data));
+        }
+    }
+
+    @Override
+    public void open(Configuration parameters) {
+        sinkManager.init();
+        sinkManager.setRuntimeContext(getRuntimeContext(), sinkOptions);
+        if (rowTransformer != null) {
+            rowTransformer.setRuntimeContext(getRuntimeContext());
+        }
+
+        MetricOption metricOption = 
MetricOption.builder().withInlongLabels(inlongMetric)
+                .withAuditAddress(auditHostAndPorts)
+                .withAuditKeys(auditKeys)
+                .build();
+
+        if (metricOption != null) {
+            sinkMetricData = new SinkMetricData(metricOption, 
getRuntimeContext().getMetricGroup());
+        }
+
+        notifyCheckpointComplete(Long.MAX_VALUE);
+        log.info("Open sink function v2. {}", EnvUtils.getGitInformation());
+    }
+
+    @Override
+    public void finish() {
+        sinkManager.flush();
+    }
+
+    @Override
+    public void close() {
+        try {
+            sinkManager.flush();
+        } catch (Exception e) {
+            log.error("Failed to flush when closing", e);
+            throw e;
+        } finally {
+            StreamLoadSnapshot snapshot = sinkManager.snapshot();
+            sinkManager.abort(snapshot);
+            sinkManager.close();
+        }
+    }
+
+    @Override
+    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) 
throws Exception {
+        sinkManager.flush();
+        if (sinkOptions.getSemantic() != StarRocksSinkSemantic.EXACTLY_ONCE) {
+            return;
+        }
+
+        StreamLoadSnapshot snapshot = sinkManager.snapshot();
+
+        if (sinkManager.prepare(snapshot)) {
+            snapshotMap.put(functionSnapshotContext.getCheckpointId(), 
Collections.singletonList(snapshot));
+
+            snapshotStates.clear();
+            snapshotStates.add(StarrocksSnapshotState.of(snapshotMap));
+        } else {
+            sinkManager.abort(snapshot);
+            throw new RuntimeException("Snapshot state failed by prepare");
+        }
+
+        if (legacyState != null) {
+            legacyState.clear();
+        }
+    }
+
+    @Override
+    public void initializeState(FunctionInitializationContext 
functionInitializationContext) throws Exception {
+        if (sinkOptions.getSemantic() != StarRocksSinkSemantic.EXACTLY_ONCE) {
+            return;
+        }
+
+        ListStateDescriptor<byte[]> descriptor =
+                new ListStateDescriptor<>(
+                        "starrocks-sink-transaction",
+                        TypeInformation.of(new TypeHint<byte[]>() {
+                        }));
+
+        ListState<byte[]> listState = 
functionInitializationContext.getOperatorStateStore().getListState(descriptor);
+        snapshotStates = new SimpleVersionedListState<>(listState, new 
StarRocksVersionedSerializer());
+
+        // old version
+        ListStateDescriptor<Map<String, StarRocksSinkBufferEntity>> 
legacyDescriptor =
+                new ListStateDescriptor<>(
+                        "buffered-rows",
+                        TypeInformation.of(new TypeHint<Map<String, 
StarRocksSinkBufferEntity>>() {
+                        }));
+        legacyState = 
functionInitializationContext.getOperatorStateStore().getListState(legacyDescriptor);
+
+        if (functionInitializationContext.isRestored()) {
+            for (StarrocksSnapshotState state : snapshotStates.get()) {
+                for (Map.Entry<Long, List<StreamLoadSnapshot>> entry : 
state.getData().entrySet()) {
+                    snapshotMap.compute(entry.getKey(), (k, v) -> {
+                        if (v == null) {
+                            return new ArrayList<>(entry.getValue());
+                        }
+                        v.addAll(entry.getValue());
+                        return v;
+                    });
+                }
+            }
+
+            legacyData = new ArrayList<>();
+            for (Map<String, StarRocksSinkBufferEntity> entry : 
legacyState.get()) {
+                legacyData.addAll(entry.values());
+            }
+            log.info("There are {} items from legacy state", 
legacyData.size());
+        }
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) {
+
+        boolean succeed = true;
+
+        List<Long> commitCheckpointIds = snapshotMap.keySet().stream()
+                .filter(cpId -> cpId <= checkpointId)
+                .sorted(Long::compare)
+                .collect(Collectors.toList());
+
+        for (Long cpId : commitCheckpointIds) {
+            try {
+                for (StreamLoadSnapshot snapshot : snapshotMap.get(cpId)) {
+                    if (!sinkManager.commit(snapshot)) {
+                        succeed = false;
+                        break;
+                    }
+                }
+
+                if (!succeed) {
+                    throw new RuntimeException(String.format("Failed to commit 
some transactions for snapshot %s, " +
+                            "please check taskmanager logs for details", 
cpId));
+                }
+            } catch (Exception e) {
+                log.error("Failed to notify checkpoint complete, checkpoint id 
: {}", checkpointId, e);
+                throw new RuntimeException("Failed to notify checkpoint 
complete for checkpoint id " + checkpointId, e);
+            }
+
+            snapshotMap.remove(cpId);
+        }
+
+        // set legacyState to null to avoid clear it in latter snapshotState
+        legacyState = null;
+    }
+
+    private void flushLegacyData() {
+        if (legacyData == null || legacyData.isEmpty()) {
+            return;
+        }
+
+        for (StarRocksSinkBufferEntity entity : legacyData) {
+            for (byte[] data : entity.getBuffer()) {
+                sinkManager.write(null, entity.getDatabase(), 
entity.getTable(),
+                        new String(data, StandardCharsets.UTF_8));
+            }
+            log.info("Write {} legacy records from table '{}' of database 
'{}'",
+                    entity.getBuffer().size(), entity.getDatabase(), 
entity.getTable());
+        }
+        legacyData.clear();
+    }
+
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicTableSink.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicTableSink.java
new file mode 100644
index 0000000000..c5df06a3ab
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicTableSink.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.starrocks.table.sink.table;
+
+import com.starrocks.connector.flink.row.sink.StarRocksTableRowTransformer;
+import 
com.starrocks.connector.flink.table.sink.StarRocksDynamicSinkFunctionBase;
+import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * This class is used to create a DynamicTableSink for Starrocks.
+ */
+public class StarRocksDynamicTableSink implements DynamicTableSink {
+
+    private transient TableSchema flinkSchema;
+    private StarRocksSinkOptions sinkOptions;
+    private String inlongMetric;
+    private String auditHostAndPorts;
+    private String auditKeys;
+
+    public StarRocksDynamicTableSink(StarRocksSinkOptions sinkOptions, 
TableSchema schema,
+            String inlongMetric, String auditHostAndPorts, String auditKeys) {
+        this.flinkSchema = schema;
+        this.sinkOptions = sinkOptions;
+        this.inlongMetric = inlongMetric;
+        this.auditHostAndPorts = auditHostAndPorts;
+        this.auditKeys = auditKeys;
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+        return requestedMode;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+
+        final TypeInformation<RowData> rowDataTypeInfo = 
context.createTypeInformation(flinkSchema.toRowDataType());
+        StarRocksDynamicSinkFunctionBase<RowData> starrocksSinkFunction = 
SinkFunctionFactory.createSinkFunction(
+                sinkOptions,
+                flinkSchema,
+                new StarRocksTableRowTransformer(rowDataTypeInfo),
+                inlongMetric,
+                auditHostAndPorts,
+                auditKeys);
+        return SinkFunctionProvider.of(starrocksSinkFunction, 
sinkOptions.getSinkParallelism());
+
+    }
+
+    @Override
+    public DynamicTableSink copy() {
+        return new StarRocksDynamicTableSink(sinkOptions, flinkSchema, 
inlongMetric, auditHostAndPorts, auditKeys);
+    }
+
+    @Override
+    public String asSummaryString() {
+        return "starrocks_sink";
+    }
+
+}
\ No newline at end of file
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/utils/SchemaUtils.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/utils/SchemaUtils.java
new file mode 100644
index 0000000000..178196d466
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/utils/SchemaUtils.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.starrocks.table.sink.utils;
+
+import org.apache.flink.table.api.TableSchema;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+/**
+ * SchemaUtils for StarRocksDynamicTableSink
+ * Deals with schema related operations such as finding the index of
+ * special field in fieldNames and filter out special field in data.
+ */
+public class SchemaUtils implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final String INLONG_DATA_TIME = "inlong_data_time";
+    private final int DATA_TIME_ABSENT_INDEX = -1;
+    private final int dataTimeFieldIndex;
+
+    public SchemaUtils(TableSchema schema) {
+        this.dataTimeFieldIndex = getDataTimeIndex(schema.getFieldNames());
+    }
+
+    public long getDataTime(Object[] data) {
+        if (dataTimeFieldIndex == DATA_TIME_ABSENT_INDEX) {
+            // if INLONG_DATA_TIME field is absent, return local time
+            return System.currentTimeMillis();
+        }
+        return (Long) data[dataTimeFieldIndex];
+    }
+
+    /**
+     * filter out INLONG_DATA_TIME field
+     * @param data
+     * @return data without INLONG_DATA_TIME
+     */
+    public Object[] filterOutTimeField(Object[] data) {
+        if (dataTimeFieldIndex == DATA_TIME_ABSENT_INDEX) {
+            return data;
+        }
+        Object[] filteredData = new Object[data.length - 1];
+        for (int i = 0, j = 0; i < data.length; i++) {
+            if (i != dataTimeFieldIndex) {
+                filteredData[j++] = data[i];
+            }
+        }
+        return filteredData;
+    }
+
+    /**
+     * INLONG_DATA_TIME should not occur in actual data schema fields
+     * @param schema
+     * @return fieldNames without INLONG_DATA_TIME
+     */
+    public String[] filterOutTimeField(TableSchema schema) {
+        return Arrays.stream(schema.getFieldNames())
+                .filter(field -> !INLONG_DATA_TIME.equals(field))
+                .toArray(String[]::new);
+    }
+
+    /**
+     * get the index of INLONG_DATA_TIME in fieldNames
+     * @param fieldNames
+     * @return index of INLONG_DATA_TIME in fieldNames, or 
DATA_TIME_ABSENT_INDEX if absent
+     */
+    private int getDataTimeIndex(String[] fieldNames) {
+        for (int i = 0; i < fieldNames.length; i++) {
+            if (INLONG_DATA_TIME.equals(fieldNames[i])) {
+                return i;
+            }
+        }
+        return DATA_TIME_ABSENT_INDEX;
+    }
+
+}
diff --git a/licenses/inlong-sort-connectors/LICENSE 
b/licenses/inlong-sort-connectors/LICENSE
index fc4d7ec6a4..d9f7cf28f3 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -756,7 +756,10 @@
  Source  : debezium-connector-oracle 1.6.4.Final (Please note that the 
software have been modified.)
  License : https://github.com/debezium/debezium/blob/main/LICENSE.txt
 
-1.3.16 
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicTableSinkFactory.java
+1.3.16 
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/sink/StarRocksDynamicTableSinkFactory.java
+       
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/sink/table/StarRocksDynamicSinkFunctionV2.java
+       
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/sink/table/StarRocksDynamicTableSink.java
+       
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/sink/table/SinkFunctionFactory.java
 
   Source  : com.starrocks:flink-connector-starrocks:1.2.7_flink-1.15 (Please 
note that the software have been modified.)
   License : https://www.apache.org/licenses/LICENSE-2.0.txt


Reply via email to