gong commented on code in PR #6820:
URL: https://github.com/apache/inlong/pull/6820#discussion_r1046700541
##########
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java:
##########
@@ -0,0 +1,583 @@
+package org.apache.inlong.sort.jdbc.internal;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
+import
org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
+import
org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
+import
org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
+import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
+import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.RowKind;
+import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
+import org.apache.inlong.sort.base.format.JsonDynamicSchemaFormat;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricState;
+import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
+import org.apache.inlong.sort.base.util.MetricStateUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
+
+public class JdbcMultiBatchingOutputFormat<In, JdbcIn, JdbcExec extends
JdbcBatchStatementExecutor<JdbcIn>>
+ extends
+ AbstractJdbcOutputFormat<In> {
+
+ private static final long serialVersionUID = 1L;
+ private static final Logger LOG =
LoggerFactory.getLogger(JdbcMultiBatchingOutputFormat.class);
+ private final JdbcExecutionOptions executionOptions;
+ private final String inlongMetric;
+ private final String auditHostAndPorts;
+ private transient int batchCount = 0;
+ private transient volatile boolean closed = false;
+ private transient ScheduledExecutorService scheduler;
+ private transient ScheduledFuture<?> scheduledFuture;
+ private transient RuntimeContext runtimeContext;
+ private JdbcDmlOptions dmlOptions;
+ private JdbcOptions jdbcOptions;
+ private boolean appendMode;
+ private transient Map<String, JdbcExec> jdbcExecMap = new HashMap<>();
+ private transient Map<String, SimpleJdbcConnectionProvider>
connectionExecProviderMap = new HashMap<>();
+ private transient Map<String, RowType> rowTypeMap = new HashMap<>();
+ private transient Map<String, List<String>> pkNameMap = new HashMap<>();
+ private transient Map<String, List<GenericRowData>> recordsMap = new
HashMap<>();
+ private transient Map<String, Exception> tableExceptionMap = new
HashMap<>();
+
+ private transient ListState<MetricState> metricStateListState;
+ private final String sinkMultipleFormat;
+ private final String databasePattern;
+ private final String tablePattern;
+ private final String schemaPattern;
+ private transient MetricState metricState;
+ private SinkMetricData sinkMetricData;
+ private Long dataSize = 0L;
+ private Long rowSize = 0L;
+ private final SchemaUpdateExceptionPolicy schemaUpdateExceptionPolicy;
+
+ public JdbcMultiBatchingOutputFormat(
+ @Nonnull JdbcConnectionProvider connectionProvider,
+ @Nonnull JdbcExecutionOptions executionOptions,
+ @Nonnull JdbcDmlOptions dmlOptions,
+ @Nonnull boolean appendMode,
+ @Nonnull JdbcOptions jdbcOptions,
+ String sinkMultipleFormat,
+ String databasePattern,
+ String tablePattern,
+ String schemaPattern,
+ String inlongMetric,
+ String auditHostAndPorts,
+ SchemaUpdateExceptionPolicy schemaUpdateExceptionPolicy) {
+ super(connectionProvider);
+ this.executionOptions = checkNotNull(executionOptions);
+ this.dmlOptions = dmlOptions;
+ this.appendMode = appendMode;
+ this.jdbcOptions = jdbcOptions;
+ this.sinkMultipleFormat = sinkMultipleFormat;
+ this.databasePattern = databasePattern;
+ this.tablePattern = tablePattern;
+ this.schemaPattern = schemaPattern;
+ this.inlongMetric = inlongMetric;
+ this.auditHostAndPorts = auditHostAndPorts;
+ this.schemaUpdateExceptionPolicy = schemaUpdateExceptionPolicy;
+ }
+
+ /**
+ * Connects to the target database and initializes the prepared statement.
+ *
+ * @param taskNumber The number of the parallel instance.
+ */
+ @Override
+ public void open(int taskNumber, int numTasks) throws IOException {
+ this.runtimeContext = getRuntimeContext();
+ MetricOption metricOption = MetricOption.builder()
+ .withInlongLabels(inlongMetric)
+ .withInlongAudit(auditHostAndPorts)
+ .withInitRecords(metricState != null ?
metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
+ .withInitBytes(metricState != null ?
metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
+ .withRegisterMetric(MetricOption.RegisteredMetric.ALL)
+ .build();
+ if (metricOption != null) {
+ sinkMetricData = new SinkMetricData(metricOption,
runtimeContext.getMetricGroup());
+ }
+ jdbcExecMap = new HashMap<>();
+ connectionExecProviderMap = new HashMap<>();
+ pkNameMap = new HashMap<>();
+ rowTypeMap = new HashMap<>();
+ recordsMap = new HashMap<>();
+ tableExceptionMap = new HashMap<>();
+ if (executionOptions.getBatchIntervalMs() != 0 &&
executionOptions.getBatchSize() != 1) {
+ this.scheduler =
+ Executors.newScheduledThreadPool(
+ 1, new
ExecutorThreadFactory("jdbc-upsert-output-format"));
+ this.scheduledFuture =
+ this.scheduler.scheduleWithFixedDelay(
+ () -> {
+ synchronized
(JdbcMultiBatchingOutputFormat.this) {
+ if (!closed) {
+ try {
+ flush();
+ if (sinkMetricData != null) {
+ sinkMetricData.invoke(rowSize,
dataSize);
+ }
+ resetStateAfterFlush();
+ } catch (Exception e) {
+ if (sinkMetricData != null) {
+
sinkMetricData.invokeDirty(rowSize, dataSize);
+ }
+ resetStateAfterFlush();
+ }
+ }
+ }
+ },
+ executionOptions.getBatchIntervalMs(),
+ executionOptions.getBatchIntervalMs(),
+ TimeUnit.MILLISECONDS);
+ }
+ }
+
+ private JdbcExec getOrCreateStatementExecutor(
+ String tableIdentifier) throws IOException {
+ if (StringUtils.isBlank(tableIdentifier)) {
+ return null;
+ }
+ JdbcExec jdbcExec = jdbcExecMap.get(tableIdentifier);
+ if (null != jdbcExec) {
+ return jdbcExec;
+ }
+ RowType rowType = rowTypeMap.get(tableIdentifier);
+ LogicalType[] logicalTypes = rowType.getFields().stream()
+ .map(RowType.RowField::getType)
+ .toArray(LogicalType[]::new);
+ String[] filedNames = rowType.getFields().stream()
+ .map(RowType.RowField::getName)
+ .toArray(String[]::new);
+ TypeInformation<RowData> rowDataTypeInfo =
InternalTypeInfo.of(rowType);
+ List<String> pkNameList = null;
+ if (null != pkNameMap.get(tableIdentifier)) {
+ pkNameList = pkNameMap.get(tableIdentifier);
+ }
+ StatementExecutorFactory<JdbcExec> statementExecutorFactory = null;
+ if (CollectionUtils.isNotEmpty(pkNameList) && !appendMode) {
+ // update
+ LOG.info("bug_test update mode");
Review Comment:
remove it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]