This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 89e2fcad95 [INLONG-8839][Sort] Add audit metric in starrocks connector
on flink 1.15 (#8894)
89e2fcad95 is described below
commit 89e2fcad959fb8e50904d0e6310d9de12baa1263
Author: Sting <[email protected]>
AuthorDate: Mon Sep 18 11:09:40 2023 +0800
[INLONG-8839][Sort] Add audit metric in starrocks connector on flink 1.15
(#8894)
---
.../src/test/resources/flinkSql/postgres_test.sql | 6 +-
.../inlong/sort/base/metric/SinkMetricData.java | 42 ++-
.../sort-connectors/starrocks/pom.xml | 6 +
.../sink/StarRocksDynamicTableSinkFactory.java | 13 +-
.../table/sink/table/SinkFunctionFactory.java | 169 ++++++++++
.../sink/table/StarRocksDynamicSinkFunctionV2.java | 374 +++++++++++++++++++++
.../sink/table/StarRocksDynamicTableSink.java | 81 +++++
.../starrocks/table/sink/utils/SchemaUtils.java | 93 +++++
licenses/inlong-sort-connectors/LICENSE | 5 +-
9 files changed, 771 insertions(+), 18 deletions(-)
diff --git
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/postgres_test.sql
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/postgres_test.sql
index 64bfe3f606..7b9957a0fa 100644
---
a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/postgres_test.sql
+++
b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/postgres_test.sql
@@ -28,9 +28,9 @@ CREATE TABLE test_output1 (
'table-name' = 'test_output1',
'username' = 'inlong',
'password' = 'inlong',
- 'sink.buffer-flush.interval-ms' = '5000',
- 'sink.properties.column_separator' = '\x01',
- 'sink.properties.row_delimiter' = '\x02'
+ 'sink.properties.format' = 'json',
+ 'sink.properties.strip_outer_array' = 'true',
+ 'sink.buffer-flush.interval-ms' = '1000'
);
INSERT INTO test_output1 select * from test_input1;
diff --git
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
index b8867ad847..fa2aee68d3 100644
---
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
+++
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
@@ -18,7 +18,6 @@
package org.apache.inlong.sort.base.metric;
import org.apache.inlong.audit.AuditOperator;
-import org.apache.inlong.sort.base.Constants;
import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
import org.apache.flink.metrics.Counter;
@@ -26,6 +25,7 @@ import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.SimpleCounter;
+import java.util.List;
import java.util.Map;
import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES_OUT;
@@ -55,6 +55,7 @@ public class SinkMetricData implements MetricData {
private Counter dirtyBytesOut;
private Meter numRecordsOutPerSecond;
private Meter numBytesOutPerSecond;
+ private List<Integer> auditKeys;
public SinkMetricData(MetricOption option, MetricGroup metricGroup) {
this.metricGroup = metricGroup;
@@ -100,6 +101,7 @@ public class SinkMetricData implements MetricData {
if (option.getIpPorts().isPresent()) {
AuditOperator.getInstance().setAuditProxy(option.getIpPortList());
this.auditOperator = AuditOperator.getInstance();
+ this.auditKeys = option.getInlongAuditKeys();
}
}
@@ -257,11 +259,39 @@ public class SinkMetricData implements MetricData {
invoke(1, getDataSize(o));
}
+ public void invokeWithEstimate(Object o, long dataTime) {
+ invoke(1, getDataSize(o), dataTime);
+ }
+
public void invokeDirtyWithEstimate(Object o) {
invokeDirty(1, getDataSize(o));
}
public void invoke(long rowCount, long rowSize) {
+ outputDefaultMetrics(rowCount, rowSize);
+ outputAuditMetrics(rowCount, rowSize, System.currentTimeMillis());
+ }
+
+ private void invoke(long rowCount, long rowSize, long dataTime) {
+ outputDefaultMetrics(rowCount, rowSize);
+ outputAuditMetrics(rowCount, rowSize, dataTime);
+ }
+
+ private void outputAuditMetrics(long rowCount, long rowSize, long
dataTime) {
+ if (auditOperator != null) {
+ for (Integer key : auditKeys) {
+ auditOperator.add(
+ key,
+ getGroupId(),
+ getStreamId(),
+ dataTime,
+ rowCount,
+ rowSize);
+ }
+ }
+ }
+
+ private void outputDefaultMetrics(long rowCount, long rowSize) {
if (numRecordsOut != null) {
numRecordsOut.inc(rowCount);
}
@@ -277,16 +307,6 @@ public class SinkMetricData implements MetricData {
if (numBytesOutForMeter != null) {
numBytesOutForMeter.inc(rowSize);
}
-
- if (auditOperator != null) {
- auditOperator.add(
- Constants.AUDIT_SORT_OUTPUT,
- getGroupId(),
- getStreamId(),
- System.currentTimeMillis(),
- rowCount,
- rowSize);
- }
}
public void invokeDirty(long rowCount, long rowSize) {
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/pom.xml
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/pom.xml
index 9fdda27057..c3412e0b00 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/pom.xml
@@ -45,6 +45,10 @@
<artifactId>flink-connector-starrocks</artifactId>
<version>${starrocks-connector.version}_flink-${flink.starrocks.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.alibaba</groupId>
+ <artifactId>fastjson</artifactId>
+ </dependency>
</dependencies>
<build>
@@ -75,6 +79,8 @@
<include>org.apache.flink:flink-shaded-guava</include>
<include>com.google.protobuf:*</include>
<include>com.starrocks:flink-connector-starrocks</include>
+ <include>com.alibaba:*</include>
+
<include>com.github.jsqlparser:jsqlparser</include>
</includes>
</artifactSet>
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicTableSinkFactory.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicTableSinkFactory.java
index 1f31cf507b..de764e7041 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicTableSinkFactory.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicTableSinkFactory.java
@@ -17,7 +17,8 @@
package org.apache.inlong.sort.starrocks.table.sink;
-import com.starrocks.connector.flink.table.sink.StarRocksDynamicTableSink;
+import
org.apache.inlong.sort.starrocks.table.sink.table.StarRocksDynamicTableSink;
+
import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
@@ -40,6 +41,9 @@ import static
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT;
import static
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_SCHEMA_UPDATE_POLICY;
import static
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TABLE_PATTERN;
+/**
+ * Factory to create StarRocksDynamicTableSink.
+ */
public class StarRocksDynamicTableSinkFactory implements
DynamicTableSinkFactory {
@Override
@@ -47,13 +51,17 @@ public class StarRocksDynamicTableSinkFactory implements
DynamicTableSinkFactory
final FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
helper.validateExcept(StarRocksSinkOptions.SINK_PROPERTIES_PREFIX,
DIRTY_PREFIX);
ReadableConfig options = helper.getOptions();
+ String inlongMetric =
options.getOptional(INLONG_METRIC).orElse(INLONG_METRIC.defaultValue());
+ String auditHostAndPorts =
options.getOptional(INLONG_AUDIT).orElse(INLONG_AUDIT.defaultValue());
+ String auditKeys =
options.getOptional(AUDIT_KEYS).orElse(AUDIT_KEYS.defaultValue());
+
// validate some special properties
StarRocksSinkOptions sinkOptions = new StarRocksSinkOptions(options,
context.getCatalogTable().getOptions());
sinkOptions.enableUpsertDelete();
TableSchema physicalSchema =
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
return new StarRocksDynamicTableSink(sinkOptions,
- physicalSchema);
+ physicalSchema, inlongMetric, auditHostAndPorts, auditKeys);
}
@Override
@@ -93,7 +101,6 @@ public class StarRocksDynamicTableSinkFactory implements
DynamicTableSinkFactory
optionalOptions.add(INLONG_METRIC);
optionalOptions.add(INLONG_AUDIT);
optionalOptions.add(AUDIT_KEYS);
-
return optionalOptions;
}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/SinkFunctionFactory.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/SinkFunctionFactory.java
new file mode 100644
index 0000000000..2b7571d4cd
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/SinkFunctionFactory.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.starrocks.table.sink.table;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.starrocks.connector.flink.row.sink.StarRocksIRowTransformer;
+import com.starrocks.connector.flink.table.sink.StarRocksDynamicSinkFunction;
+import
com.starrocks.connector.flink.table.sink.StarRocksDynamicSinkFunctionBase;
+import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
+import com.starrocks.connector.flink.table.sink.StarRocksSinkSemantic;
+import com.starrocks.connector.flink.tools.ConnectionUtils;
+import com.starrocks.data.load.stream.StreamLoadConstants;
+import com.starrocks.data.load.stream.StreamLoadUtils;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.http.HttpHeaders;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.DefaultRedirectStrategy;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/** Create sink function according to the configuration. */
+public class SinkFunctionFactory {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ SinkFunctionFactory.class);
+
+ enum SinkVersion {
+ // Implement exactly-once using stream load which has a
+ // poor performance. All versions of StarRocks are supported
+ V1,
+ // Implement exactly-once using transaction load since StarRocks 2.4
+ V2,
+ // Select sink version automatically according to whether StarRocks
+ // supports transaction load
+ AUTO
+ }
+
+ public static boolean
isStarRocksSupportTransactionLoad(StarRocksSinkOptions sinkOptions) {
+ String host = ConnectionUtils.selectAvailableHttpHost(
+ sinkOptions.getLoadUrlList(), sinkOptions.getConnectTimeout());
+ if (host == null) {
+ throw new RuntimeException("Can't find an available host in " +
sinkOptions.getLoadUrlList());
+ }
+
+ String beginUrlStr = StreamLoadConstants.getBeginUrl(host);
+ HttpPost httpPost = new HttpPost(beginUrlStr);
+ httpPost.addHeader(HttpHeaders.AUTHORIZATION,
+ StreamLoadUtils.getBasicAuthHeader(sinkOptions.getUsername(),
sinkOptions.getPassword()));
+
httpPost.setConfig(RequestConfig.custom().setExpectContinueEnabled(true).setRedirectsEnabled(true).build());
+ LOG.info("Transaction load probe post {}", httpPost);
+
+ HttpClientBuilder clientBuilder = HttpClients.custom()
+ .setRedirectStrategy(new DefaultRedirectStrategy() {
+
+ @Override
+ protected boolean isRedirectable(String method) {
+ return true;
+ }
+ });
+
+ try (CloseableHttpClient client = clientBuilder.build()) {
+ CloseableHttpResponse response = client.execute(httpPost);
+ String responseBody = EntityUtils.toString(response.getEntity());
+ LOG.info("Transaction load probe response {}", responseBody);
+
+ JSONObject bodyJson = JSON.parseObject(responseBody);
+ String status = bodyJson.getString("status");
+ String msg = bodyJson.getString("msg");
+
+ // If StarRocks does not support transaction load, FE's
NotFoundAction#executePost
+ // will be called where you can know how the response json is
constructed
+ if ("FAILED".equals(status) && "Not implemented".equals(msg)) {
+ return false;
+ }
+ return true;
+ } catch (IOException e) {
+ String errMsg = "Failed to probe transaction load for " + host;
+ LOG.warn("{}", errMsg, e);
+ throw new RuntimeException(errMsg, e);
+ }
+ }
+
+ public static void detectStarRocksFeature(StarRocksSinkOptions
sinkOptions) {
+ try {
+ boolean supportTransactionLoad =
isStarRocksSupportTransactionLoad(sinkOptions);
+
sinkOptions.setSupportTransactionStreamLoad(supportTransactionLoad);
+ if (supportTransactionLoad) {
+ LOG.info("StarRocks supports transaction load");
+ } else {
+ LOG.info("StarRocks does not support transaction load");
+ }
+ } catch (Exception e) {
+ LOG.warn("Can't decide whether StarRocks supports transaction
load, and enable it by default.");
+ sinkOptions.setSupportTransactionStreamLoad(true);
+ }
+ }
+
+ public static SinkFunctionFactory.SinkVersion
chooseSinkVersionAutomatically(StarRocksSinkOptions sinkOptions) {
+ if
(StarRocksSinkSemantic.AT_LEAST_ONCE.equals(sinkOptions.getSemantic())) {
+ LOG.info("Choose sink version V2 for at-least-once.");
+ return SinkVersion.V2;
+ }
+
+ if (sinkOptions.isSupportTransactionStreamLoad()) {
+ LOG.info("StarRocks supports transaction load, and choose sink
version V2");
+ return SinkVersion.V2;
+ } else {
+ LOG.info("StarRocks does not support transaction load, and choose
sink version V1");
+ return SinkVersion.V1;
+ }
+ }
+
+ public static SinkVersion getSinkVersion(StarRocksSinkOptions sinkOptions)
{
+ String sinkTypeOption =
sinkOptions.getSinkVersion().trim().toUpperCase();
+ SinkVersion sinkVersion;
+ if (SinkVersion.V1.name().equals(sinkTypeOption)) {
+ sinkVersion = SinkVersion.V1;
+ } else if (SinkVersion.V2.name().equals(sinkTypeOption)) {
+ sinkVersion = SinkVersion.V2;
+ } else if (SinkVersion.AUTO.name().equals(sinkTypeOption)) {
+ sinkVersion = chooseSinkVersionAutomatically(sinkOptions);
+ } else {
+ throw new UnsupportedOperationException("Unsupported sink type " +
sinkTypeOption);
+ }
+ LOG.info("Choose sink version {}", sinkVersion.name());
+ return sinkVersion;
+ }
+
+ public static <T> StarRocksDynamicSinkFunctionBase<T> createSinkFunction(
+ StarRocksSinkOptions sinkOptions, TableSchema schema,
StarRocksIRowTransformer<T> rowTransformer,
+ String inlongMetric, String auditHostAndPorts, String auditKeys) {
+ detectStarRocksFeature(sinkOptions);
+ SinkVersion sinkVersion = getSinkVersion(sinkOptions);
+ switch (sinkVersion) {
+ case V1:
+ return new StarRocksDynamicSinkFunction<>(sinkOptions, schema,
rowTransformer);
+ case V2:
+ return new StarRocksDynamicSinkFunctionV2<>(sinkOptions,
schema, rowTransformer,
+ inlongMetric, auditHostAndPorts, auditKeys);
+ default:
+ throw new UnsupportedOperationException("Unsupported sink type
" + sinkVersion.name());
+ }
+ }
+
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicSinkFunctionV2.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicSinkFunctionV2.java
new file mode 100644
index 0000000000..70326a67a3
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicSinkFunctionV2.java
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.starrocks.table.sink.table;
+
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.starrocks.table.sink.utils.SchemaUtils;
+
+import com.google.common.base.Strings;
+import com.starrocks.connector.flink.manager.StarRocksSinkBufferEntity;
+import com.starrocks.connector.flink.manager.StarRocksSinkManagerV2;
+import com.starrocks.connector.flink.manager.StarRocksSinkTable;
+import com.starrocks.connector.flink.row.sink.StarRocksIRowTransformer;
+import com.starrocks.connector.flink.row.sink.StarRocksISerializer;
+import com.starrocks.connector.flink.row.sink.StarRocksSerializerFactory;
+import com.starrocks.connector.flink.table.data.StarRocksRowData;
+import
com.starrocks.connector.flink.table.sink.StarRocksDynamicSinkFunctionBase;
+import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
+import com.starrocks.connector.flink.table.sink.StarRocksSinkRowDataWithMeta;
+import com.starrocks.connector.flink.table.sink.StarRocksSinkSemantic;
+import com.starrocks.connector.flink.table.sink.StarRocksVersionedSerializer;
+import com.starrocks.connector.flink.table.sink.StarrocksSnapshotState;
+import com.starrocks.connector.flink.tools.EnvUtils;
+import com.starrocks.data.load.stream.StreamLoadSnapshot;
+import net.sf.jsqlparser.JSQLParserException;
+import net.sf.jsqlparser.parser.CCJSqlParserUtil;
+import net.sf.jsqlparser.statement.Statement;
+import net.sf.jsqlparser.statement.alter.Alter;
+import net.sf.jsqlparser.statement.truncate.Truncate;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.NestedRowData;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.InstantiationUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+/**
+ * StarRocks dynamic sink function. It supports insert, upsert, delete in
Starrocks.
+ * @param <T>
+ */
+public class StarRocksDynamicSinkFunctionV2<T> extends
StarRocksDynamicSinkFunctionBase<T> {
+
+ private static final long serialVersionUID = 1L;
+ private static final Logger log =
LoggerFactory.getLogger(StarRocksDynamicSinkFunctionV2.class);
+
+ private final StarRocksSinkOptions sinkOptions;
+ private final StarRocksSinkManagerV2 sinkManager;
+ private final StarRocksISerializer serializer;
+ private final StarRocksIRowTransformer<T> rowTransformer;
+ private static final int NESTED_ROW_DATA_HEADER_SIZE = 256;
+
+ private transient volatile ListState<StarrocksSnapshotState>
snapshotStates;
+ private final Map<Long, List<StreamLoadSnapshot>> snapshotMap = new
ConcurrentHashMap<>();
+ private transient SinkMetricData sinkMetricData;
+
+ @Deprecated
+ private transient ListState<Map<String, StarRocksSinkBufferEntity>>
legacyState;
+ @Deprecated
+ private transient List<StarRocksSinkBufferEntity> legacyData;
+ private String inlongMetric;
+ private String auditHostAndPorts;
+ private String auditKeys;
+ private SchemaUtils schemaUtils;
+
+ public StarRocksDynamicSinkFunctionV2(StarRocksSinkOptions sinkOptions,
+ TableSchema schema,
+ StarRocksIRowTransformer<T> rowTransformer, String inlongMetric,
+ String auditHostAndPorts, String auditKeys) {
+ this.sinkOptions = sinkOptions;
+ this.rowTransformer = rowTransformer;
+ this.auditHostAndPorts = auditHostAndPorts;
+ this.inlongMetric = inlongMetric;
+ this.auditKeys = auditKeys;
+ StarRocksSinkTable sinkTable = StarRocksSinkTable.builder()
+ .sinkOptions(sinkOptions)
+ .build();
+ this.schemaUtils = new SchemaUtils(schema);
+ // StarRocksJsonSerializer depends on SinkOptions#supportUpsertDelete
which is decided in
+ // StarRocksSinkTable#validateTableStructure, so create serializer
after validating table structure
+ this.serializer =
StarRocksSerializerFactory.createSerializer(sinkOptions,
+ schemaUtils.filterOutTimeField(schema));
+ rowTransformer.setStarRocksColumns(sinkTable.getFieldMapping());
+ rowTransformer.setTableSchema(schema);
+ this.sinkManager = new
StarRocksSinkManagerV2(sinkOptions.getProperties(),
+ sinkOptions.getSemantic() ==
StarRocksSinkSemantic.AT_LEAST_ONCE);
+ }
+
+ @Override
+ public void invoke(T value, Context context)
+ throws IOException, ClassNotFoundException, JSQLParserException {
+
+ if (serializer == null) {
+ if (value instanceof StarRocksSinkRowDataWithMeta) {
+ StarRocksSinkRowDataWithMeta data =
(StarRocksSinkRowDataWithMeta) value;
+ if (Strings.isNullOrEmpty(data.getDatabase())
+ || Strings.isNullOrEmpty(data.getTable())
+ || data.getDataRows() == null) {
+ log.warn(String.format("json row data not fulfilled.
{database: %s, table: %s, dataRows: %s}",
+ data.getDatabase(), data.getTable(),
Arrays.toString(data.getDataRows())));
+ return;
+ }
+ sinkManager.write(null, data.getDatabase(), data.getTable(),
data.getDataRows());
+ return;
+ } else if (value instanceof StarRocksRowData) {
+ StarRocksRowData data = (StarRocksRowData) value;
+ if (Strings.isNullOrEmpty(data.getDatabase())
+ || Strings.isNullOrEmpty(data.getTable())
+ || data.getRow() == null) {
+ log.warn(String.format("json row data not fulfilled.
{database: %s, table: %s, dataRows: %s}",
+ data.getDatabase(), data.getTable(),
data.getRow()));
+ return;
+ }
+ sinkManager.write(data.getUniqueKey(), data.getDatabase(),
data.getTable(), data.getRow());
+ return;
+ }
+ // raw data sink
+ sinkManager.write(null, sinkOptions.getDatabaseName(),
sinkOptions.getTableName(), value.toString());
+ return;
+ }
+
+ if (value instanceof NestedRowData) {
+ NestedRowData ddlData = (NestedRowData) value;
+ if (ddlData.getSegments().length != 1 ||
ddlData.getSegments()[0].size() < NESTED_ROW_DATA_HEADER_SIZE) {
+ return;
+ }
+
+ int totalSize = ddlData.getSegments()[0].size();
+ byte[] data = new byte[totalSize - NESTED_ROW_DATA_HEADER_SIZE];
+ ddlData.getSegments()[0].get(NESTED_ROW_DATA_HEADER_SIZE, data);
+ Map<String, String> ddlMap =
InstantiationUtil.deserializeObject(data, HashMap.class.getClassLoader());
+ if (ddlMap == null
+ || "true".equals(ddlMap.get("snapshot"))
+ || Strings.isNullOrEmpty(ddlMap.get("ddl"))
+ || Strings.isNullOrEmpty(ddlMap.get("databaseName"))) {
+ return;
+ }
+ Statement statement = CCJSqlParserUtil.parse(ddlMap.get("ddl"));
+ if (statement instanceof Truncate) {
+ Truncate truncate = (Truncate) statement;
+ if
(!sinkOptions.getTableName().equalsIgnoreCase(truncate.getTable().getName())) {
+ return;
+ }
+ // TODO: add ddl to queue
+ } else if (statement instanceof Alter) {
+
+ }
+ }
+ if (value instanceof RowData) {
+ if (RowKind.UPDATE_BEFORE.equals(((RowData) value).getRowKind())) {
+ // do not need update_before, cause an update action happened
on the primary keys will be separated into
+ // `delete` and `create`
+ return;
+ }
+ if (!sinkOptions.supportUpsertDelete() &&
RowKind.DELETE.equals(((RowData) value).getRowKind())) {
+ // let go the UPDATE_AFTER and INSERT rows for tables who have
a group of `unique` or `duplicate` keys.
+ return;
+ }
+ }
+
+ flushLegacyData();
+
+ Object[] data = rowTransformer.transform(value,
sinkOptions.supportUpsertDelete());
+
+ ouputMetrics(value, data);
+
+ sinkManager.write(
+ null,
+ sinkOptions.getDatabaseName(),
+ sinkOptions.getTableName(),
+ serializer.serialize(schemaUtils.filterOutTimeField(data)));
+
+ }
+
+ private void ouputMetrics(T value, Object[] data) {
+ if (sinkMetricData != null) {
+ sinkMetricData.invokeWithEstimate(value,
schemaUtils.getDataTime(data));
+ }
+ }
+
+ @Override
+ public void open(Configuration parameters) {
+ sinkManager.init();
+ sinkManager.setRuntimeContext(getRuntimeContext(), sinkOptions);
+ if (rowTransformer != null) {
+ rowTransformer.setRuntimeContext(getRuntimeContext());
+ }
+
+ MetricOption metricOption =
MetricOption.builder().withInlongLabels(inlongMetric)
+ .withAuditAddress(auditHostAndPorts)
+ .withAuditKeys(auditKeys)
+ .build();
+
+ if (metricOption != null) {
+ sinkMetricData = new SinkMetricData(metricOption,
getRuntimeContext().getMetricGroup());
+ }
+
+ notifyCheckpointComplete(Long.MAX_VALUE);
+ log.info("Open sink function v2. {}", EnvUtils.getGitInformation());
+ }
+
+ @Override
+ public void finish() {
+ sinkManager.flush();
+ }
+
+ @Override
+ public void close() {
+ try {
+ sinkManager.flush();
+ } catch (Exception e) {
+ log.error("Failed to flush when closing", e);
+ throw e;
+ } finally {
+ StreamLoadSnapshot snapshot = sinkManager.snapshot();
+ sinkManager.abort(snapshot);
+ sinkManager.close();
+ }
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext functionSnapshotContext)
throws Exception {
+ sinkManager.flush();
+ if (sinkOptions.getSemantic() != StarRocksSinkSemantic.EXACTLY_ONCE) {
+ return;
+ }
+
+ StreamLoadSnapshot snapshot = sinkManager.snapshot();
+
+ if (sinkManager.prepare(snapshot)) {
+ snapshotMap.put(functionSnapshotContext.getCheckpointId(),
Collections.singletonList(snapshot));
+
+ snapshotStates.clear();
+ snapshotStates.add(StarrocksSnapshotState.of(snapshotMap));
+ } else {
+ sinkManager.abort(snapshot);
+ throw new RuntimeException("Snapshot state failed by prepare");
+ }
+
+ if (legacyState != null) {
+ legacyState.clear();
+ }
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext
functionInitializationContext) throws Exception {
+ if (sinkOptions.getSemantic() != StarRocksSinkSemantic.EXACTLY_ONCE) {
+ return;
+ }
+
+ ListStateDescriptor<byte[]> descriptor =
+ new ListStateDescriptor<>(
+ "starrocks-sink-transaction",
+ TypeInformation.of(new TypeHint<byte[]>() {
+ }));
+
+ ListState<byte[]> listState =
functionInitializationContext.getOperatorStateStore().getListState(descriptor);
+ snapshotStates = new SimpleVersionedListState<>(listState, new
StarRocksVersionedSerializer());
+
+ // old version
+ ListStateDescriptor<Map<String, StarRocksSinkBufferEntity>>
legacyDescriptor =
+ new ListStateDescriptor<>(
+ "buffered-rows",
+ TypeInformation.of(new TypeHint<Map<String,
StarRocksSinkBufferEntity>>() {
+ }));
+ legacyState =
functionInitializationContext.getOperatorStateStore().getListState(legacyDescriptor);
+
+ if (functionInitializationContext.isRestored()) {
+ for (StarrocksSnapshotState state : snapshotStates.get()) {
+ for (Map.Entry<Long, List<StreamLoadSnapshot>> entry :
state.getData().entrySet()) {
+ snapshotMap.compute(entry.getKey(), (k, v) -> {
+ if (v == null) {
+ return new ArrayList<>(entry.getValue());
+ }
+ v.addAll(entry.getValue());
+ return v;
+ });
+ }
+ }
+
+ legacyData = new ArrayList<>();
+ for (Map<String, StarRocksSinkBufferEntity> entry :
legacyState.get()) {
+ legacyData.addAll(entry.values());
+ }
+ log.info("There are {} items from legacy state",
legacyData.size());
+ }
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {
+
+ boolean succeed = true;
+
+ List<Long> commitCheckpointIds = snapshotMap.keySet().stream()
+ .filter(cpId -> cpId <= checkpointId)
+ .sorted(Long::compare)
+ .collect(Collectors.toList());
+
+ for (Long cpId : commitCheckpointIds) {
+ try {
+ for (StreamLoadSnapshot snapshot : snapshotMap.get(cpId)) {
+ if (!sinkManager.commit(snapshot)) {
+ succeed = false;
+ break;
+ }
+ }
+
+ if (!succeed) {
+ throw new RuntimeException(String.format("Failed to commit
some transactions for snapshot %s, " +
+ "please check taskmanager logs for details",
cpId));
+ }
+ } catch (Exception e) {
+ log.error("Failed to notify checkpoint complete, checkpoint id
: {}", checkpointId, e);
+ throw new RuntimeException("Failed to notify checkpoint
complete for checkpoint id " + checkpointId, e);
+ }
+
+ snapshotMap.remove(cpId);
+ }
+
+ // set legacyState to null to avoid clear it in latter snapshotState
+ legacyState = null;
+ }
+
+ private void flushLegacyData() {
+ if (legacyData == null || legacyData.isEmpty()) {
+ return;
+ }
+
+ for (StarRocksSinkBufferEntity entity : legacyData) {
+ for (byte[] data : entity.getBuffer()) {
+ sinkManager.write(null, entity.getDatabase(),
entity.getTable(),
+ new String(data, StandardCharsets.UTF_8));
+ }
+ log.info("Write {} legacy records from table '{}' of database
'{}'",
+ entity.getBuffer().size(), entity.getDatabase(),
entity.getTable());
+ }
+ legacyData.clear();
+ }
+
+}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicTableSink.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicTableSink.java
new file mode 100644
index 0000000000..c5df06a3ab
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/table/StarRocksDynamicTableSink.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.starrocks.table.sink.table;
+
+import com.starrocks.connector.flink.row.sink.StarRocksTableRowTransformer;
+import
com.starrocks.connector.flink.table.sink.StarRocksDynamicSinkFunctionBase;
+import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * This class is used to create a DynamicTableSink for Starrocks.
+ */
+public class StarRocksDynamicTableSink implements DynamicTableSink {
+
+ private transient TableSchema flinkSchema;
+ private StarRocksSinkOptions sinkOptions;
+ private String inlongMetric;
+ private String auditHostAndPorts;
+ private String auditKeys;
+
+ public StarRocksDynamicTableSink(StarRocksSinkOptions sinkOptions,
TableSchema schema,
+ String inlongMetric, String auditHostAndPorts, String auditKeys) {
+ this.flinkSchema = schema;
+ this.sinkOptions = sinkOptions;
+ this.inlongMetric = inlongMetric;
+ this.auditHostAndPorts = auditHostAndPorts;
+ this.auditKeys = auditKeys;
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+ return requestedMode;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+
+ final TypeInformation<RowData> rowDataTypeInfo =
context.createTypeInformation(flinkSchema.toRowDataType());
+ StarRocksDynamicSinkFunctionBase<RowData> starrocksSinkFunction =
SinkFunctionFactory.createSinkFunction(
+ sinkOptions,
+ flinkSchema,
+ new StarRocksTableRowTransformer(rowDataTypeInfo),
+ inlongMetric,
+ auditHostAndPorts,
+ auditKeys);
+ return SinkFunctionProvider.of(starrocksSinkFunction,
sinkOptions.getSinkParallelism());
+
+ }
+
+ @Override
+ public DynamicTableSink copy() {
+ return new StarRocksDynamicTableSink(sinkOptions, flinkSchema,
inlongMetric, auditHostAndPorts, auditKeys);
+ }
+
+ @Override
+ public String asSummaryString() {
+ return "starrocks_sink";
+ }
+
+}
\ No newline at end of file
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/utils/SchemaUtils.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/utils/SchemaUtils.java
new file mode 100644
index 0000000000..178196d466
--- /dev/null
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/utils/SchemaUtils.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.starrocks.table.sink.utils;
+
+import org.apache.flink.table.api.TableSchema;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+/**
+ * SchemaUtils for StarRocksDynamicTableSink
+ * Deals with schema related operations such as finding the index of
+ * special field in fieldNames and filter out special field in data.
+ */
+public class SchemaUtils implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final String INLONG_DATA_TIME = "inlong_data_time";
+ private final int DATA_TIME_ABSENT_INDEX = -1;
+ private final int dataTimeFieldIndex;
+
+ public SchemaUtils(TableSchema schema) {
+ this.dataTimeFieldIndex = getDataTimeIndex(schema.getFieldNames());
+ }
+
+ public long getDataTime(Object[] data) {
+ if (dataTimeFieldIndex == DATA_TIME_ABSENT_INDEX) {
+ // if INLONG_DATA_TIME field is absent, return local time
+ return System.currentTimeMillis();
+ }
+ return (Long) data[dataTimeFieldIndex];
+ }
+
+ /**
+ * filter out INLONG_DATA_TIME field
+ * @param data
+ * @return data without INLONG_DATA_TIME
+ */
+ public Object[] filterOutTimeField(Object[] data) {
+ if (dataTimeFieldIndex == DATA_TIME_ABSENT_INDEX) {
+ return data;
+ }
+ Object[] filteredData = new Object[data.length - 1];
+ for (int i = 0, j = 0; i < data.length; i++) {
+ if (i != dataTimeFieldIndex) {
+ filteredData[j++] = data[i];
+ }
+ }
+ return filteredData;
+ }
+
+ /**
+ * INLONG_DATA_TIME should not occur in actual data schema fields
+ * @param schema
+ * @return fieldNames without INLONG_DATA_TIME
+ */
+ public String[] filterOutTimeField(TableSchema schema) {
+ return Arrays.stream(schema.getFieldNames())
+ .filter(field -> !INLONG_DATA_TIME.equals(field))
+ .toArray(String[]::new);
+ }
+
+ /**
+ * get the index of INLONG_DATA_TIME in fieldNames
+ * @param fieldNames
+ * @return index of INLONG_DATA_TIME in fieldNames, or
DATA_TIME_ABSENT_INDEX if absent
+ */
+ private int getDataTimeIndex(String[] fieldNames) {
+ for (int i = 0; i < fieldNames.length; i++) {
+ if (INLONG_DATA_TIME.equals(fieldNames[i])) {
+ return i;
+ }
+ }
+ return DATA_TIME_ABSENT_INDEX;
+ }
+
+}
diff --git a/licenses/inlong-sort-connectors/LICENSE
b/licenses/inlong-sort-connectors/LICENSE
index fc4d7ec6a4..d9f7cf28f3 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -756,7 +756,10 @@
Source : debezium-connector-oracle 1.6.4.Final (Please note that the
software have been modified.)
License : https://github.com/debezium/debezium/blob/main/LICENSE.txt
-1.3.16
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicTableSinkFactory.java
+1.3.16
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/sink/StarRocksDynamicTableSinkFactory.java
+
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/sink/table/StarRocksDynamicSinkFunctionV2.java
+
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/sink/table/StarRocksDynamicTableSink.java
+
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/sink/table/SinkFunctionFactory.java
Source : com.starrocks:flink-connector-starrocks:1.2.7_flink-1.15 (Please
note that the software have been modified.)
License : https://www.apache.org/licenses/LICENSE-2.0.txt