This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch release-1.3.0 in repository https://gitbox.apache.org/repos/asf/inlong.git
commit f92c1839d583b8efad66bea3b7608b347f3aa683 Author: Oneal65 <[email protected]> AuthorDate: Wed Sep 21 16:46:04 2022 +0800 [INLONG-5943][Sort] Add metric state for JDBC (#5966) --- .../jdbc/internal/AbstractJdbcOutputFormat.java | 83 ++++++++++++++++++++++ .../jdbc/internal/GenericJdbcSinkFunction.java | 5 +- .../jdbc/internal/JdbcBatchingOutputFormat.java | 40 ++++++++++- 3 files changed, 125 insertions(+), 3 deletions(-) diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/AbstractJdbcOutputFormat.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/AbstractJdbcOutputFormat.java new file mode 100644 index 000000000..e45537d7e --- /dev/null +++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/AbstractJdbcOutputFormat.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.jdbc.internal; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.io.RichOutputFormat; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Flushable; +import java.io.IOException; +import java.sql.Connection; + +/** Base jdbc outputFormat. */ +public abstract class AbstractJdbcOutputFormat<T> extends RichOutputFormat<T> implements Flushable { + + private static final long serialVersionUID = 1L; + public static final int DEFAULT_FLUSH_MAX_SIZE = 5000; + public static final long DEFAULT_FLUSH_INTERVAL_MILLS = 0L; + + private static final Logger LOG = LoggerFactory.getLogger( + org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat.class); + protected final JdbcConnectionProvider connectionProvider; + + public AbstractJdbcOutputFormat(JdbcConnectionProvider connectionProvider) { + this.connectionProvider = Preconditions.checkNotNull(connectionProvider); + } + + @Override + public void configure(Configuration parameters) { + + } + + @Override + public void open(int taskNumber, int numTasks) throws IOException { + try { + connectionProvider.getOrEstablishConnection(); + } catch (Exception e) { + throw new IOException("unable to open JDBC writer", e); + } + } + + @Override + public void close() { + connectionProvider.closeConnection(); + } + + @Override + public void flush() throws IOException { + + } + + @VisibleForTesting + public Connection getConnection() { + return connectionProvider.getConnection(); + } + + abstract void snapshotState(FunctionSnapshotContext context) throws Exception; + + abstract void initializeState(FunctionInitializationContext context) throws Exception; +} diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/GenericJdbcSinkFunction.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/GenericJdbcSinkFunction.java index 717ed3cd0..0afd5fb24 100644 --- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/GenericJdbcSinkFunction.java +++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/GenericJdbcSinkFunction.java @@ -21,7 +21,6 @@ package org.apache.inlong.sort.jdbc.internal; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.configuration.Configuration; -import org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; @@ -60,12 +59,14 @@ public class GenericJdbcSinkFunction<T> extends RichSinkFunction<T> } @Override - public void initializeState(FunctionInitializationContext context) { + public void initializeState(FunctionInitializationContext context) throws Exception { + outputFormat.initializeState(context); } @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { outputFormat.flush(); + outputFormat.snapshotState(context); } @Override diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java index e32d4094b..33346d686 100644 --- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java +++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java @@ -19,10 +19,13 @@ package org.apache.inlong.sort.jdbc.internal; import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.connector.jdbc.JdbcExecutionOptions; import org.apache.flink.connector.jdbc.JdbcStatementBuilder; -import org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat; import org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider; import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider; import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor; @@ -30,12 +33,16 @@ import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions; import org.apache.flink.connector.jdbc.internal.options.JdbcOptions; import org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl; import org.apache.flink.connector.jdbc.utils.JdbcUtils; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.runtime.util.ExecutorThreadFactory; import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions; import org.apache.inlong.sort.base.metric.MetricOption; import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; +import org.apache.inlong.sort.base.metric.MetricState; import org.apache.inlong.sort.base.metric.SinkMetricData; +import org.apache.inlong.sort.base.util.MetricStateUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,8 +57,12 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.function.Function; + import static org.apache.flink.connector.jdbc.utils.JdbcUtils.setRecordToStatement; import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME; +import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT; +import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT; /** * A JDBC outputFormat that supports batching records before writing records to database. @@ -76,6 +87,8 @@ public class JdbcBatchingOutputFormat< private transient volatile Exception flushException; private transient RuntimeContext runtimeContext; + private transient ListState<MetricState> metricStateListState; + private transient MetricState metricState; private SinkMetricData sinkMetricData; private Long dataSize = 0L; private Long rowSize = 0L; @@ -127,6 +140,8 @@ public class JdbcBatchingOutputFormat< MetricOption metricOption = MetricOption.builder() .withInlongLabels(inlongMetric) .withInlongAudit(auditHostAndPorts) + .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L) + .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L) .withRegisterMetric(RegisteredMetric.ALL) .build(); if (metricOption != null) { @@ -216,6 +231,29 @@ public class JdbcBatchingOutputFormat< jdbcStatementExecutor.addToBatch(extracted); } + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + if (sinkMetricData != null && metricStateListState != null) { + MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, sinkMetricData, + getRuntimeContext().getIndexOfThisSubtask()); + } + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + if (this.inlongMetric != null) { + this.metricStateListState = context.getOperatorStateStore().getUnionListState( + new ListStateDescriptor<>( + INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint<MetricState>() { + }))); + + } + if (context.isRestored()) { + metricState = MetricStateUtils.restoreMetricState(metricStateListState, + getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks()); + } + } + @Override public synchronized void flush() throws IOException { checkFlushException();
