This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 6572cf9f8 [INLONG-7245][Sort] Support metric and audit in
sort-connect-redis (#7309)
6572cf9f8 is described below
commit 6572cf9f8b0913b062a80fa14caa68d0d0716652
Author: feat <[email protected]>
AuthorDate: Thu Feb 2 16:55:04 2023 +0800
[INLONG-7245][Sort] Support metric and audit in sort-connect-redis (#7309)
---
.../sort/redis/sink/AbstractRedisSinkFunction.java | 69 ++++++++++++++++++----
.../sort/redis/sink/RedisBitmapSinkFunction.java | 8 ++-
.../sort/redis/sink/RedisDynamicTableSink.java | 27 +++++++--
.../sort/redis/sink/RedisHashSinkFunction.java | 9 ++-
.../sort/redis/sink/RedisPlainSinkFunction.java | 8 ++-
.../inlong/sort/redis/sink/RedisSinkFunction.java | 8 ++-
.../sort/redis/table/RedisDynamicTableFactory.java | 4 +-
7 files changed, 106 insertions(+), 27 deletions(-)
diff --git
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/AbstractRedisSinkFunction.java
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/AbstractRedisSinkFunction.java
index 2829198ea..97ae9b9f2 100644
---
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/AbstractRedisSinkFunction.java
+++
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/AbstractRedisSinkFunction.java
@@ -18,17 +18,22 @@
package org.apache.inlong.sort.redis.sink;
import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS_OUT;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
-import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.concurrent.GuardedBy;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
@@ -37,6 +42,10 @@ import
org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import
org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
import org.apache.flink.table.data.RowData;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricState;
+import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.util.MetricStateUtils;
import
org.apache.inlong.sort.redis.common.container.InlongRedisCommandsContainer;
import
org.apache.inlong.sort.redis.common.container.RedisCommandsContainerBuilder;
import org.apache.inlong.sort.redis.common.schema.StateEncoder;
@@ -90,8 +99,6 @@ public abstract class AbstractRedisSinkFunction<OUT>
private final List<OUT> rows;
- private transient ScheduledExecutorService executorService;
-
/**
* The container for all available Redis commands.
*/
@@ -104,6 +111,12 @@ public abstract class AbstractRedisSinkFunction<OUT>
protected transient StopWatch stopWatch;
protected StateEncoder<OUT> stateEncoder;
+ private final String auditHostAndPorts;
+
+ private final String inLongMetric;
+ private transient MetricState metricState;
+ private transient ListState<MetricState> metricStateListState;
+ private SinkMetricData sinkMetricData;
public AbstractRedisSinkFunction(
TypeInformation<OUT> outputType,
@@ -112,7 +125,9 @@ public abstract class AbstractRedisSinkFunction<OUT>
long batchSize,
Duration flushInterval,
Duration configuration,
- FlinkJedisConfigBase flinkJedisConfigBase) {
+ FlinkJedisConfigBase flinkJedisConfigBase,
+ String inLongMetric,
+ String auditHostAndPorts) {
checkNotNull(configuration, "The configuration must not be null.");
this.stateEncoder = stateEncoder;
@@ -124,6 +139,8 @@ public abstract class AbstractRedisSinkFunction<OUT>
this.forceFlush = false;
this.rows = new ArrayList<>();
this.flinkJedisConfigBase = flinkJedisConfigBase;
+ this.inLongMetric = inLongMetric;
+ this.auditHostAndPorts = auditHostAndPorts;
}
@Override
@@ -152,11 +169,34 @@ public abstract class AbstractRedisSinkFunction<OUT>
outputFlusher = Optional.of(new OutputFlusher(threadName,
flushIntervalInMillis));
outputFlusher.get().start();
}
-
+ MetricOption metricOption = MetricOption.builder()
+ .withInlongLabels(inLongMetric)
+ .withInlongAudit(auditHostAndPorts)
+ .withInitRecords(metricState != null ?
metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
+ .withInitBytes(metricState != null ?
metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
+ .withInitDirtyRecords(metricState != null ?
metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L)
+ .withInitDirtyBytes(metricState != null ?
metricState.getMetricValue(DIRTY_BYTES_OUT) : 0L)
+ .withRegisterMetric(MetricOption.RegisteredMetric.ALL)
+ .build();
+ if (metricOption != null) {
+ sinkMetricData = new SinkMetricData(metricOption,
getRuntimeContext().getMetricGroup());
+ }
}
@Override
public void initializeState(FunctionInitializationContext context) throws
Exception {
+ if (this.inLongMetric != null) {
+ this.metricStateListState =
context.getOperatorStateStore().getUnionListState(
+ new ListStateDescriptor<>(
+ INLONG_METRIC_STATE_NAME, TypeInformation.of(new
TypeHint<MetricState>() {
+ })));
+ }
+
+ if (context.isRestored()) {
+ metricState =
MetricStateUtils.restoreMetricState(metricStateListState,
+ getRuntimeContext().getIndexOfThisSubtask(),
getRuntimeContext().getNumberOfParallelSubtasks());
+ }
+
final ListStateDescriptor<OUT> stateDescriptor = new
ListStateDescriptor<>(
"rowState", outputType);
this.listState =
context.getOperatorStateStore().getListState(stateDescriptor);
@@ -174,6 +214,10 @@ public abstract class AbstractRedisSinkFunction<OUT>
listState.clear();
listState.addAll(rows);
}
+ if (sinkMetricData != null && metricStateListState != null) {
+
MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState,
sinkMetricData,
+ getRuntimeContext().getIndexOfThisSubtask());
+ }
LOG.info("redis end snapshotState, id: {}",
functionSnapshotContext.getCheckpointId());
}
@@ -189,6 +233,7 @@ public abstract class AbstractRedisSinkFunction<OUT>
public void invoke(RowData in, Context context) {
List<OUT> redisOutputs = serialize(in);
+ sendMetrics(in.toString().getBytes());
synchronized (lock) {
rows.addAll(redisOutputs);
if (forceFlush || rows.size() >= batchSize) {
@@ -202,14 +247,6 @@ public abstract class AbstractRedisSinkFunction<OUT>
public void close() throws Exception {
closeClient();
- if (executorService != null) {
- try {
- executorService.shutdown();
- } catch (Throwable t) {
- LOG.warn("Could not properly shut down
ScheduledExecutorService.", t);
- }
- }
-
super.close();
LOG.info("Closed redis sink.");
@@ -289,4 +326,10 @@ public abstract class AbstractRedisSinkFunction<OUT>
}
}
}
+
+ protected void sendMetrics(byte[] document) {
+ if (sinkMetricData != null) {
+ sinkMetricData.invoke(1, document.length);
+ }
+ }
}
diff --git
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/RedisBitmapSinkFunction.java
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/RedisBitmapSinkFunction.java
index 4e59aa252..d68ff3d28 100644
---
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/RedisBitmapSinkFunction.java
+++
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/RedisBitmapSinkFunction.java
@@ -45,7 +45,9 @@ public class RedisBitmapSinkFunction
long batchSize,
Duration flushInterval,
Duration configuration,
- FlinkJedisConfigBase flinkJedisConfigBase) {
+ FlinkJedisConfigBase flinkJedisConfigBase,
+ String inlongMetric,
+ String auditHostAndPorts) {
super(TypeInformation.of(new TypeHint<Tuple4<Boolean, String, Long,
Boolean>>() {
}),
serializationSchema,
@@ -53,7 +55,9 @@ public class RedisBitmapSinkFunction
batchSize,
flushInterval,
configuration,
- flinkJedisConfigBase);
+ flinkJedisConfigBase,
+ inlongMetric,
+ auditHostAndPorts);
LOG.info("Creating RedisBitmapStaticKvPairSinkFunction ...");
}
diff --git
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/RedisDynamicTableSink.java
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/RedisDynamicTableSink.java
index df16979f2..2e96e3c1d 100644
---
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/RedisDynamicTableSink.java
+++
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/RedisDynamicTableSink.java
@@ -59,13 +59,18 @@ public class RedisDynamicTableSink implements
DynamicTableSink {
private final ReadableConfig config;
private final Map<String, String> properties;
+ private final String inlongMetric;
+ private final String auditHostAndPorts;
+
public RedisDynamicTableSink(
EncodingFormat<SerializationSchema<RowData>> format,
ResolvedSchema resolvedSchema,
RedisDataType dataType,
SchemaMappingMode schemaMappingMode,
ReadableConfig config,
- Map<String, String> properties) {
+ Map<String, String> properties,
+ String inlongMetric,
+ String auditHostAndPorts) {
this.format = format;
this.resolvedSchema = resolvedSchema;
this.dataType = dataType;
@@ -73,6 +78,9 @@ public class RedisDynamicTableSink implements
DynamicTableSink {
this.config = config;
this.properties = properties;
+ this.inlongMetric = inlongMetric;
+ this.auditHostAndPorts = auditHostAndPorts;
+
flinkJedisConfigBase = RedisHandlerServices
.findRedisHandler(InlongJedisConfigHandler.class, properties)
.createFlinkJedisConfig(config);
@@ -117,7 +125,9 @@ public class RedisDynamicTableSink implements
DynamicTableSink {
batchSize,
flushInterval,
expireTime,
- flinkJedisConfigBase);
+ flinkJedisConfigBase,
+ inlongMetric,
+ auditHostAndPorts);
break;
case PLAIN:
redisSinkFunction = new RedisPlainSinkFunction(
@@ -126,7 +136,9 @@ public class RedisDynamicTableSink implements
DynamicTableSink {
batchSize,
flushInterval,
expireTime,
- flinkJedisConfigBase);
+ flinkJedisConfigBase,
+ inlongMetric,
+ auditHostAndPorts);
break;
case BITMAP:
redisSinkFunction = new RedisBitmapSinkFunction(
@@ -135,7 +147,10 @@ public class RedisDynamicTableSink implements
DynamicTableSink {
batchSize,
flushInterval,
expireTime,
- flinkJedisConfigBase);
+ flinkJedisConfigBase,
+
+ inlongMetric,
+ auditHostAndPorts);
break;
default:
throw new UnsupportedOperationException();
@@ -178,7 +193,9 @@ public class RedisDynamicTableSink implements
DynamicTableSink {
dataType,
mappingMode,
config,
- properties);
+ properties,
+ inlongMetric,
+ auditHostAndPorts);
}
@Override
diff --git
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/RedisHashSinkFunction.java
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/RedisHashSinkFunction.java
index 7a2c25a9c..0874ac2f5 100644
---
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/RedisHashSinkFunction.java
+++
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/RedisHashSinkFunction.java
@@ -45,7 +45,9 @@ public class RedisHashSinkFunction
long batchSize,
Duration flushInterval,
Duration expireTime,
- FlinkJedisConfigBase flinkJedisConfigBase) {
+ FlinkJedisConfigBase flinkJedisConfigBase,
+ String inlongMetric,
+ String auditHostAndPorts) {
super(TypeInformation.of(new TypeHint<Tuple4<Boolean, String, String,
String>>() {
}),
serializationSchema,
@@ -53,7 +55,10 @@ public class RedisHashSinkFunction
batchSize,
flushInterval,
expireTime,
- flinkJedisConfigBase);
+ flinkJedisConfigBase,
+
+ inlongMetric,
+ auditHostAndPorts);
LOG.info("Creating RedisHashSinkFunction ...");
}
diff --git
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/RedisPlainSinkFunction.java
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/RedisPlainSinkFunction.java
index e8fc817bd..6163b866f 100644
---
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/RedisPlainSinkFunction.java
+++
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/RedisPlainSinkFunction.java
@@ -45,7 +45,9 @@ public class RedisPlainSinkFunction
long batchSize,
Duration flushInterval,
Duration configuration,
- FlinkJedisConfigBase flinkJedisConfigBase) {
+ FlinkJedisConfigBase flinkJedisConfigBase,
+ String inlongMetric,
+ String auditHostAndPorts) {
super(TypeInformation.of(new TypeHint<Tuple3<Boolean, String,
String>>() {
}),
serializationSchema,
@@ -53,7 +55,9 @@ public class RedisPlainSinkFunction
batchSize,
flushInterval,
configuration,
- flinkJedisConfigBase);
+ flinkJedisConfigBase,
+ inlongMetric,
+ auditHostAndPorts);
checkNotNull(serializationSchema, "The serialization schema must not
be null.");
ensureSerializable(serializationSchema);
}
diff --git
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/RedisSinkFunction.java
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/RedisSinkFunction.java
index 232f81486..0f5f3e489 100644
---
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/RedisSinkFunction.java
+++
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/sink/RedisSinkFunction.java
@@ -45,7 +45,9 @@ public class RedisSinkFunction
long batchSize,
Duration flushInterval,
Duration configuration,
- FlinkJedisConfigBase flinkJedisConfigBase) {
+ FlinkJedisConfigBase flinkJedisConfigBase,
+ String inlongMetric,
+ String auditHostAndPorts) {
super(TypeInformation.of(new TypeHint<Tuple3<Boolean, String,
String>>() {
}),
serializationSchema,
@@ -53,7 +55,9 @@ public class RedisSinkFunction
batchSize,
flushInterval,
configuration,
- flinkJedisConfigBase);
+ flinkJedisConfigBase,
+ inlongMetric,
+ auditHostAndPorts);
checkNotNull(serializationSchema, "The serialization schema must not
be null.");
ensureSerializable(serializationSchema);
}
diff --git
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/table/RedisDynamicTableFactory.java
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/table/RedisDynamicTableFactory.java
index 21657c69e..8edf979e9 100644
---
a/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/table/RedisDynamicTableFactory.java
+++
b/inlong-sort/sort-connectors/redis/src/main/java/org/apache/inlong/sort/redis/table/RedisDynamicTableFactory.java
@@ -128,7 +128,9 @@ public class RedisDynamicTableFactory implements
DynamicTableSourceFactory, Dyna
dataType,
schemaMappingMode,
config,
- properties);
+ properties,
+ inlongMetric,
+ auditHostAndPorts);
}
private RedisLookupOptions getJdbcLookupOptions(ReadableConfig
readableConfig) {