This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 10a153b22 [INLONG-7249][Sort] JDBC accurate dirty data archive and
metric calculation (#7580)
10a153b22 is described below
commit 10a153b228d9615958066d901191f2c1448e1f8d
Author: Yizhou Yang <[email protected]>
AuthorDate: Tue Apr 11 17:02:50 2023 +0800
[INLONG-7249][Sort] JDBC accurate dirty data archive and metric calculation
(#7580)
Co-authored-by: Yizhou Yang <[email protected]>
---
.../sort/base/dirty/sink/log/LogDirtySink.java | 2 +
.../inlong/sort/base/dirty/RegexReplaceTest.java | 17 +-
.../jdbc/internal/JdbcBatchingOutputFormat.java | 66 +++++-
.../internal/JdbcMultiBatchingOutputFormat.java | 195 +++++++++++++++-
.../internal/TableMetricStatementExecutor.java | 249 +++++++++++++++++++++
5 files changed, 506 insertions(+), 23 deletions(-)
diff --git
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySink.java
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySink.java
index 2884ac398..942e83da1 100644
---
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySink.java
+++
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySink.java
@@ -89,6 +89,8 @@ public class LogDirtySink<T> implements DirtySink<T> {
RowData.FieldGetter[] getters = fieldGetters;
if (rowType != null) {
getters = FormatUtils.parseFieldGetters(rowType);
+ } else {
+ return data.toString();
}
value = FormatUtils.csvFormat(data, getters, labels,
fieldDelimiter);
break;
diff --git
a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/dirty/RegexReplaceTest.java
b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/dirty/RegexReplaceTest.java
index 2554d0202..bc1061c81 100644
---
a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/dirty/RegexReplaceTest.java
+++
b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/dirty/RegexReplaceTest.java
@@ -21,19 +21,16 @@ import lombok.extern.slf4j.Slf4j;
import org.junit.Assert;
import org.junit.Test;
-import java.io.IOException;
-
@Slf4j
public class RegexReplaceTest {
@Test
- public void testRegexReplacement() throws IOException {
- String[] identifier = new String[2];
- identifier[0] = "yizhouyang";
- identifier[1] = "table2";
- String pattern = "${database}-${table}-${DIRTY_MESSAGE}";
- String answer = DirtySinkHelper.regexReplace(pattern,
DirtyType.BATCH_LOAD_ERROR, "mock message", identifier[0],
- identifier[1], null);
- Assert.assertEquals("yizhouyang-table2-mock message", answer);
+ public void testRegexReplacement() {
+ String database = "database1";
+ String table = "table2";
+ String pattern = "${source.table}-${source.database}-${DIRTY_MESSAGE}";
+ String answer = DirtySinkHelper.regexReplace(pattern,
DirtyType.BATCH_LOAD_ERROR, "mock message", database,
+ table, null);
+ Assert.assertEquals("table2-database1-mock message", answer);
}
}
diff --git
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java
index 07ea97e70..f7c2f74e5 100644
---
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java
+++
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcBatchingOutputFormat.java
@@ -28,10 +28,15 @@ import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import
org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
import
org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
import
org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
+import
org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor;
+import
org.apache.flink.connector.jdbc.internal.executor.TableBufferedStatementExecutor;
+import
org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor;
import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
import
org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl;
+import org.apache.flink.connector.jdbc.statement.StatementFactory;
import org.apache.flink.connector.jdbc.utils.JdbcUtils;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
@@ -42,6 +47,7 @@ import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;
import org.apache.inlong.sort.base.dirty.DirtyData;
import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.DirtySinkHelper;
import org.apache.inlong.sort.base.dirty.DirtyType;
import org.apache.inlong.sort.base.dirty.sink.DirtySink;
import org.apache.inlong.sort.base.metric.MetricOption;
@@ -56,6 +62,7 @@ import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.Serializable;
+import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.sql.SQLException;
import java.util.HashMap;
@@ -184,7 +191,8 @@ public class JdbcBatchingOutputFormat<In, JdbcIn, JdbcExec
extends JdbcBatchStat
if (!closed) {
try {
flush();
- if (sinkMetricData != null) {
+ // report is only needed when
TableMetricExecutor is not initialized
+ if (sinkMetricData != null &&
dirtySink == null) {
sinkMetricData.invoke(rowSize,
dataSize);
}
resetStateAfterFlush();
@@ -204,6 +212,16 @@ public class JdbcBatchingOutputFormat<In, JdbcIn, JdbcExec
extends JdbcBatchStat
private JdbcExec createAndOpenStatementExecutor(
StatementExecutorFactory<JdbcExec> statementExecutorFactory)
throws IOException {
JdbcExec exec = statementExecutorFactory.apply(getRuntimeContext());
+ if (dirtySink != null) {
+ try {
+ JdbcExec newExecutor = enhanceExecutor(exec);
+ if (newExecutor != null) {
+ exec = newExecutor;
+ }
+ } catch (Exception e) {
+ LOG.error("tableStatementExecutor enhance failed", e);
+ }
+ }
try {
exec.prepareStatements(connectionProvider.getConnection());
} catch (SQLException e) {
@@ -277,7 +295,7 @@ public class JdbcBatchingOutputFormat<In, JdbcIn, JdbcExec
extends JdbcBatchStat
if (executionOptions.getBatchSize() > 0
&& batchCount >= executionOptions.getBatchSize()) {
flush();
- if (sinkMetricData != null) {
+ if (sinkMetricData != null && dirtySink == null) {
sinkMetricData.invoke(rowSize, dataSize);
}
resetStateAfterFlush();
@@ -303,7 +321,6 @@ public class JdbcBatchingOutputFormat<In, JdbcIn, JdbcExec
extends JdbcBatchStat
try {
jdbcStatementExecutor.addToBatch(extracted);
} catch (Exception e) {
- LOG.error(String.format("DataTypeMappingError, data: %s",
extracted), e);
handleDirtyData(extracted, DirtyType.DATA_TYPE_MAPPING_ERROR, e);
}
}
@@ -365,6 +382,49 @@ public class JdbcBatchingOutputFormat<In, JdbcIn, JdbcExec
extends JdbcBatchStat
}
}
+ /**
+ * Use reflection to initialize TableMetricStatementExecutor, and replace
the original executor
+ * or upsertExecutor to calculate metrics.
+ */
+ private JdbcExec enhanceExecutor(JdbcExec exec) throws
NoSuchFieldException, IllegalAccessException {
+ if (dirtySink == null) {
+ return null;
+ }
+ final DirtySinkHelper dirtySinkHelper = new
DirtySinkHelper<>(dirtyOptions, dirtySink);
+ // enhance the actual executor to tablemetricstatementexecutor
+ Field executorType;
+ if (exec instanceof TableBufferReducedStatementExecutor) {
+ executorType =
TableBufferReducedStatementExecutor.class.getDeclaredField("upsertExecutor");
+ } else if (exec instanceof TableBufferedStatementExecutor) {
+ executorType =
TableBufferedStatementExecutor.class.getDeclaredField("statementExecutor");
+ } else {
+ throw new RuntimeException("table enhance failed, can't enhance "
+ exec.getClass());
+ }
+ executorType.setAccessible(true);
+ TableSimpleStatementExecutor executor = (TableSimpleStatementExecutor)
executorType.get(exec);
+ // get the factory and rowconverter to initialize
TableMetricStatementExecutor.
+ Field statementFactory =
TableSimpleStatementExecutor.class.getDeclaredField("stmtFactory");
+ Field rowConverter =
TableSimpleStatementExecutor.class.getDeclaredField("converter");
+ statementFactory.setAccessible(true);
+ rowConverter.setAccessible(true);
+ final StatementFactory stmtFactory = (StatementFactory)
statementFactory.get(executor);
+ final JdbcRowConverter converter = (JdbcRowConverter)
rowConverter.get(executor);
+ TableMetricStatementExecutor newExecutor =
+ new TableMetricStatementExecutor(stmtFactory, converter,
dirtySinkHelper, sinkMetricData);
+ // for TableBufferedStatementExecutor, replace the executor
+ if (exec instanceof TableBufferedStatementExecutor) {
+ Field transform =
TableBufferedStatementExecutor.class.getDeclaredField("valueTransform");
+ transform.setAccessible(true);
+ Function<RowData, RowData> valueTransform = (Function<RowData,
RowData>) transform.get(exec);
+ newExecutor.setValueTransform(valueTransform);
+ return (JdbcExec) newExecutor;
+ }
+ // replace the sub-executor that generates flinkSQL for executors such
as
+ // TableBufferReducedExecutor or InsertOrUpdateExecutor
+ executorType.set(exec, newExecutor);
+ return null;
+ }
+
protected void attemptFlush() throws SQLException {
jdbcStatementExecutor.executeBatch();
}
diff --git
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java
index e812eeeff..96ab15ed6 100644
---
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java
+++
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java
@@ -28,9 +28,14 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import
org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
import
org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
+import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
import
org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
+import
org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor;
+import
org.apache.flink.connector.jdbc.internal.executor.TableBufferedStatementExecutor;
+import
org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor;
import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
+import org.apache.flink.connector.jdbc.statement.StatementFactory;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
@@ -43,7 +48,9 @@ import
org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
import org.apache.inlong.sort.base.dirty.DirtySinkHelper;
+import org.apache.inlong.sort.base.dirty.DirtyType;
import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
import org.apache.inlong.sort.base.format.JsonDynamicSchemaFormat;
import org.apache.inlong.sort.base.metric.MetricOption;
@@ -58,6 +65,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
import java.io.IOException;
import java.io.Serializable;
+import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.sql.SQLException;
import java.sql.Timestamp;
@@ -108,6 +116,7 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn,
JdbcExec extends JdbcBatc
private transient ScheduledExecutorService scheduler;
private transient ScheduledFuture<?> scheduledFuture;
private transient RuntimeContext runtimeContext;
+ private transient JsonDynamicSchemaFormat jsonDynamicSchemaFormat;
private JdbcDmlOptions dmlOptions;
private JdbcOptions jdbcOptions;
private boolean appendMode;
@@ -118,7 +127,6 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn,
JdbcExec extends JdbcBatc
private transient Map<String, List<GenericRowData>> recordsMap = new
HashMap<>();
private transient Map<String, Exception> tableExceptionMap = new
HashMap<>();
private transient Boolean stopWritingWhenTableException;
-
private transient ListState<MetricState> metricStateListState;
private final String sinkMultipleFormat;
private final String databasePattern;
@@ -256,7 +264,7 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn,
JdbcExec extends JdbcBatc
if (null != pkNameMap.get(tableIdentifier)) {
pkNameList = pkNameMap.get(tableIdentifier);
}
- StatementExecutorFactory<JdbcExec> statementExecutorFactory = null;
+ StatementExecutorFactory<JdbcExec> statementExecutorFactory;
if (CollectionUtils.isNotEmpty(pkNameList) && !appendMode) {
// upsert query
JdbcDmlOptions createDmlOptions = JdbcDmlOptions.builder()
@@ -294,6 +302,19 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn,
JdbcExec extends JdbcBatc
return null;
}
connectionExecProviderMap.put(tableIdentifier,
tableConnectionProvider);
+
+ if (!stopWritingWhenTableException &&
dirtySinkHelper.getDirtySink() != null) {
+ try {
+ JdbcExec newExecutor = enhanceExecutor(jdbcExec);
+ if (newExecutor != null) {
+ jdbcExec = newExecutor;
+ }
+ } catch (Exception e) {
+ LOG.warn("enhance executor failed for class :" +
+ jdbcExec.getClass(), e);
+ }
+ }
+
jdbcExec.prepareStatements(tableConnectionProvider.getConnection());
} catch (Exception e) {
return null;
@@ -302,6 +323,49 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn,
JdbcExec extends JdbcBatc
return jdbcExec;
}
+ /**
+ * Use reflection to initialize TableMetricStatementExecutor, and replace
the original executor
+ * or upsertExecutor to calculate metrics.
+ */
+ private JdbcExec enhanceExecutor(JdbcExec exec) throws
NoSuchFieldException, IllegalAccessException {
+ if (dirtySinkHelper.getDirtySink() == null) {
+ return null;
+ }
+ // enhance the actual executor to tablemetricstatementexecutor
+ Field subExecutor;
+ if (exec instanceof TableBufferReducedStatementExecutor) {
+ subExecutor =
TableBufferReducedStatementExecutor.class.getDeclaredField("upsertExecutor");
+ } else if (exec instanceof TableBufferedStatementExecutor) {
+ subExecutor =
TableBufferedStatementExecutor.class.getDeclaredField("statementExecutor");
+ } else {
+ throw new RuntimeException("table enhance failed, can't enhance "
+ exec.getClass());
+ }
+ subExecutor.setAccessible(true);
+ TableSimpleStatementExecutor executor = (TableSimpleStatementExecutor)
subExecutor.get(exec);
+ // get the stamentfactory and rowconverter in order to initialize
tablemetricstatementExecutor
+ Field statementFactory =
TableSimpleStatementExecutor.class.getDeclaredField("stmtFactory");
+ Field rowConverter =
TableSimpleStatementExecutor.class.getDeclaredField("converter");
+ statementFactory.setAccessible(true);
+ rowConverter.setAccessible(true);
+ final StatementFactory stmtFactory = (StatementFactory)
statementFactory.get(executor);
+ final JdbcRowConverter converter = (JdbcRowConverter)
rowConverter.get(executor);
+ TableMetricStatementExecutor newExecutor =
+ new TableMetricStatementExecutor(stmtFactory, converter,
dirtySinkHelper, sinkMetricData);
+ newExecutor.setMultipleSink(true);
+ // replace the original TableBufferedStatementExecutor with metric
executor
+ if (exec instanceof TableBufferedStatementExecutor) {
+ subExecutor =
TableBufferedStatementExecutor.class.getDeclaredField("valueTransform");
+ subExecutor.setAccessible(true);
+ Function<RowData, RowData> valueTransform = (Function<RowData,
RowData>) subExecutor.get(exec);
+ newExecutor.setValueTransform(valueTransform);
+ return (JdbcExec) newExecutor;
+ }
+ // replace the sub-executor that generates flinkSQL for executors such
as
+ // TableBufferReducedExecutor or InsertOrUpdateExecutor
+ subExecutor.set(exec, newExecutor);
+ return null;
+ }
+
public void getAndSetPkNamesFromDb(String tableIdentifier) {
try {
AbstractJdbcDialect jdbcDialect = (AbstractJdbcDialect)
jdbcOptions.getDialect();
@@ -332,12 +396,12 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn,
JdbcExec extends JdbcBatc
@Override
public final synchronized void writeRecord(In row) throws IOException {
checkFlushException();
- JsonDynamicSchemaFormat jsonDynamicSchemaFormat =
- (JsonDynamicSchemaFormat)
DynamicSchemaFormatFactory.getFormat(sinkMultipleFormat);
+ jsonDynamicSchemaFormat = (JsonDynamicSchemaFormat)
DynamicSchemaFormatFactory.getFormat(sinkMultipleFormat);
+
if (row instanceof RowData) {
RowData rowData = (RowData) row;
JsonNode rootNode =
jsonDynamicSchemaFormat.deserialize(rowData.getBinary(0));
- String tableIdentifier = null;
+ String tableIdentifier;
try {
if (StringUtils.isBlank(schemaPattern)) {
tableIdentifier = StringUtils.join(
@@ -354,7 +418,7 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn,
JdbcExec extends JdbcBatc
return;
}
- GenericRowData record = null;
+ GenericRowData record;
try {
RowType rowType =
jsonDynamicSchemaFormat.extractSchema(rootNode);
if (rowType != null) {
@@ -392,6 +456,58 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn,
JdbcExec extends JdbcBatc
}
}
+ private void fillDirtyData(JdbcExec exec, String tableIdentifier) {
+ String[] identifiers = tableIdentifier.split("\\.");
+ String database;
+ String table;
+ String schema = null;
+ if (identifiers.length == 2) {
+ database = identifiers[0];
+ table = identifiers[1];
+ } else {
+ database = identifiers[0];
+ schema = identifiers[1];
+ table = identifiers[2];
+ }
+ TableMetricStatementExecutor executor = null;
+ try {
+ Field subExecutor;
+ if (exec instanceof TableMetricStatementExecutor) {
+ executor = (TableMetricStatementExecutor) exec;
+ } else if (exec instanceof TableBufferReducedStatementExecutor) {
+ subExecutor =
TableBufferReducedStatementExecutor.class.getDeclaredField("upsertExecutor");
+ subExecutor.setAccessible(true);
+ executor = (TableMetricStatementExecutor)
subExecutor.get(exec);
+ } else if (exec instanceof TableBufferedStatementExecutor) {
+ subExecutor =
TableBufferedStatementExecutor.class.getDeclaredField("statementExecutor");
+ subExecutor.setAccessible(true);
+ executor = (TableMetricStatementExecutor)
subExecutor.get(exec);
+ }
+ } catch (Exception e) {
+ LOG.error("parse executor failed", e);
+ }
+
+ try {
+ DirtyOptions dirtyOptions = dirtySinkHelper.getDirtyOptions();
+ String dirtyLabel =
DirtySinkHelper.regexReplace(dirtyOptions.getLabels(),
DirtyType.BATCH_LOAD_ERROR, null,
+ database, table, schema);
+ String dirtyLogTag =
+ DirtySinkHelper.regexReplace(dirtyOptions.getLogTag(),
DirtyType.BATCH_LOAD_ERROR, null,
+ database, table, schema);
+ String dirtyIdentifier =
+ DirtySinkHelper.regexReplace(dirtyOptions.getIdentifier(),
DirtyType.BATCH_LOAD_ERROR,
+ null, database, table, schema);
+
+ if (executor != null) {
+ executor.setDirtyMetaData(dirtyLabel, dirtyLogTag,
dirtyIdentifier);
+ } else {
+ LOG.error("executor is null, can not set metaData");
+ }
+ } catch (Exception e) {
+ LOG.error("filling dirty metadata failed", e);
+ }
+ }
+
/**
* Convert fieldMap(data) to GenericRowData with rowType(schema)
*/
@@ -510,20 +626,33 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn,
JdbcExec extends JdbcBatc
if (CollectionUtils.isEmpty(tableIdRecordList)) {
continue;
}
- JdbcExec jdbcStatementExecutor = null;
+ JdbcExec jdbcStatementExecutor;
Boolean flushFlag = false;
Exception tableException = null;
try {
+ getAndSetPkNamesFromDb(tableIdentifier);
jdbcStatementExecutor =
getOrCreateStatementExecutor(tableIdentifier);
Long totalDataSize = 0L;
for (GenericRowData record : tableIdRecordList) {
totalDataSize = totalDataSize +
record.toString().getBytes(StandardCharsets.UTF_8).length;
jdbcStatementExecutor.addToBatch((JdbcIn) record);
}
+ if (dirtySinkHelper.getDirtySink() != null) {
+ fillDirtyData(jdbcStatementExecutor, tableIdentifier);
+ }
jdbcStatementExecutor.executeBatch();
flushFlag = true;
- outputMetrics(tableIdentifier,
Long.valueOf(tableIdRecordList.size()),
- totalDataSize, false);
+ if (dirtySinkHelper.getDirtySink() == null) {
+ outputMetrics(tableIdentifier,
Long.valueOf(tableIdRecordList.size()),
+ totalDataSize, false);
+ } else {
+ try {
+ outputMetrics(tableIdentifier);
+ } catch (Exception e) {
+ outputMetrics(tableIdentifier,
Long.valueOf(tableIdRecordList.size()),
+ totalDataSize, false);
+ }
+ }
} catch (Exception e) {
tableException = e;
LOG.warn("Flush all data for tableIdentifier:{} get err:",
tableIdentifier, e);
@@ -547,7 +676,18 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn,
JdbcExec extends JdbcBatc
jdbcStatementExecutor.executeBatch();
Long totalDataSize =
Long.valueOf(record.toString().getBytes(StandardCharsets.UTF_8).length);
- outputMetrics(tableIdentifier, 1L, totalDataSize,
false);
+ if (dirtySinkHelper.getDirtySink() == null) {
+ outputMetrics(tableIdentifier, (long)
tableIdRecordList.size(),
+ totalDataSize, false);
+ } else {
+ try {
+ outputMetrics(tableIdentifier);
+ } catch (Exception e) {
+ LOG.error("JDBC table metric calculation
exception", e);
+ outputMetrics(tableIdentifier, (long)
tableIdRecordList.size(),
+ totalDataSize, false);
+ }
+ }
flushFlag = true;
break;
} catch (Exception e) {
@@ -611,6 +751,41 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn,
JdbcExec extends JdbcBatc
}
}
+ private void outputMetrics(String tableIdentifier) throws
NoSuchFieldException, IllegalAccessException {
+ String[] fieldArray = tableIdentifier.split("\\.");
+ // throw an exception if the executor is not enhanced
+ JdbcExec executor = jdbcExecMap.get(tableIdentifier);
+ Field subExecutor;
+ if (executor instanceof TableBufferReducedStatementExecutor) {
+ subExecutor =
TableBufferReducedStatementExecutor.class.getDeclaredField("upsertExecutor");
+ subExecutor.setAccessible(true);
+ executor = (JdbcExec) subExecutor.get(executor);
+ } else if (executor instanceof TableBufferedStatementExecutor) {
+ subExecutor =
TableBufferedStatementExecutor.class.getDeclaredField("statementExecutor");
+ subExecutor.setAccessible(true);
+ executor = (JdbcExec) subExecutor.get(executor);
+ }
+ Field metricField =
TableMetricStatementExecutor.class.getDeclaredField("metric");
+ long[] metrics = (long[]) metricField.get(executor);
+ long cleanCount = metrics[0];
+ long cleanSize = metrics[1];
+ long dirtyCount = metrics[2];
+ long dirtySize = metrics[3];
+
+ if (fieldArray.length == 3) {
+ sinkMetricData.outputDirtyMetrics(fieldArray[0], fieldArray[1],
fieldArray[2],
+ dirtyCount, dirtySize);
+ sinkMetricData.outputMetrics(fieldArray[0], fieldArray[1],
fieldArray[2],
+ cleanCount, cleanSize);
+ } else if (fieldArray.length == 2) {
+ sinkMetricData.outputDirtyMetrics(fieldArray[0], null,
fieldArray[1],
+ dirtyCount, dirtySize);
+ sinkMetricData.outputMetrics(fieldArray[0], null, fieldArray[1],
+ cleanCount, cleanSize);
+ }
+ metricField.set(executor, new long[4]);
+ }
+
/**
* Executes prepared statement and closes all resources of this instance.
*/
diff --git
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableMetricStatementExecutor.java
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableMetricStatementExecutor.java
new file mode 100644
index 000000000..c5ae7cc65
--- /dev/null
+++
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/TableMetricStatementExecutor.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.internal;
+
+import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
+import
org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
+import org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatement;
+import org.apache.flink.connector.jdbc.statement.StatementFactory;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.table.data.RowData;
+import org.apache.inlong.sort.base.dirty.DirtySinkHelper;
+import org.apache.inlong.sort.base.dirty.DirtyType;
+import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link JdbcBatchStatementExecutor} that simply adds the records into
batches of {@link
+ * java.sql.PreparedStatement} and doesn't buffer records in memory. Only used
in Table/SQL API.
+ * Supported executors:TableBufferedStatementExecutor,
TableBufferReducedStatementExecutor, TableSimpleStatementExecutor
+ */
+public final class TableMetricStatementExecutor implements
JdbcBatchStatementExecutor<RowData> {
+
+ private static final Pattern pattern = Pattern.compile("Batch entry
(\\d+)");
+ private static final Logger LOG =
LoggerFactory.getLogger(TableMetricStatementExecutor.class);
+ private final StatementFactory stmtFactory;
+ private final JdbcRowConverter converter;
+ private List<RowData> batch;
+ private final DirtySinkHelper<Object> dirtySinkHelper;
+ private final SinkMetricData sinkMetricData;
+ private final AtomicInteger counter = new AtomicInteger();
+ private transient FieldNamedPreparedStatement st;
+ private boolean multipleSink;
+ private String label;
+ private String logtag;
+ private String identifier;
+ private Function<RowData, RowData> valueTransform = null;
+ // counters used for table level metric calculation for multiple sink
+ public long[] metric = new long[4];
+
+ public TableMetricStatementExecutor(StatementFactory stmtFactory,
JdbcRowConverter converter,
+ DirtySinkHelper<Object> dirtySinkHelper, SinkMetricData
sinkMetricData) {
+ this.stmtFactory = checkNotNull(stmtFactory);
+ this.converter = checkNotNull(converter);
+ this.batch = new CopyOnWriteArrayList<>();
+ this.dirtySinkHelper = dirtySinkHelper;
+ this.sinkMetricData = sinkMetricData;
+ }
+
+ public void setDirtyMetaData(String label, String logtag, String
identifier) {
+ this.label = label;
+ this.logtag = logtag;
+ this.identifier = identifier;
+ }
+
+ public void setMultipleSink(boolean multipleSink) {
+ this.multipleSink = multipleSink;
+ }
+
+ @Override
+ public void prepareStatements(Connection connection) throws SQLException {
+ st = stmtFactory.createStatement(connection);
+ }
+
+ public void setValueTransform(Function<RowData, RowData> valueTransform) {
+ this.valueTransform = valueTransform;
+ }
+
+ @Override
+ public void addToBatch(RowData record) throws SQLException {
+ if (valueTransform != null) {
+ record = valueTransform.apply(record); // copy or not
+ }
+ batch.add(record);
+ converter.toExternal(record, st);
+ st.addBatch();
+ }
+
+ @Override
+ public void executeBatch() throws SQLException {
+ try {
+ st.executeBatch();
+
+ long writtenSize = batch.size();
+ // approximate since it may be inefficient to iterate over all
writtenSize-1 elements.
+ long writtenBytes = 0L;
+ if (writtenSize > 0) {
+ writtenBytes = (long)
batch.get(0).toString().getBytes(StandardCharsets.UTF_8).length * writtenSize;
+ }
+ batch.clear();
+ if (!multipleSink) {
+ sinkMetricData.invoke(writtenSize, writtenBytes);
+ } else {
+ metric[0] += writtenSize;
+ metric[1] += writtenBytes;
+ }
+
+ } catch (SQLException e) {
+ // clear the prepared statement first to avoid exceptions
+ st.clearParameters();
+ try {
+ processErrorPosition(e);
+ } catch (Exception ex) {
+ try {
+ retryEntireBatch();
+ } catch (JsonProcessingException exc) {
+ LOG.error("dirty data archive failed");
+ }
+ }
+ }
+ }
+
+ private void processErrorPosition(SQLException e) throws SQLException {
+ List<Integer> errorPositions = parseError(e);
+ // the data before the first sqlexception are already written, handle
those and remove them.
+ int writtenSize = errorPositions.get(0);
+ long writtenBytes = 0L;
+ if (writtenSize > 0) {
+ writtenBytes = (long)
batch.get(0).toString().getBytes(StandardCharsets.UTF_8).length * writtenSize;
+ }
+ if (!multipleSink) {
+ sinkMetricData.invoke(writtenSize, writtenBytes);
+ } else {
+ metric[0] += writtenSize;
+ metric[1] += writtenBytes;
+ }
+
+ batch = batch.subList(writtenSize, batch.size());
+
+ // for the unwritten data, remove the dirty ones
+ for (int pos : errorPositions) {
+ pos -= writtenSize;
+ RowData record = batch.get(pos);
+ batch.remove(record);
+ invokeDirty(record, e);
+ }
+
+ // try to execute the supposedly clean batch, throw exception on
failure
+ for (RowData record : batch) {
+ addToBatch(record);
+ }
+ st.executeBatch();
+ batch.clear();
+ st.clearParameters();
+ }
+
+ private void retryEntireBatch() throws SQLException,
JsonProcessingException {
+ // clear parameters to make sure the batch is always clean in the end.
+ st.clearParameters();
+ for (RowData rowData : batch) {
+ try {
+ converter.toExternal(rowData, st);
+ st.addBatch();
+ st.executeBatch();
+ if (!multipleSink) {
+ sinkMetricData.invoke(1,
rowData.toString().getBytes().length);
+ } else {
+ metric[0] += 1;
+ metric[1] += rowData.toString().getBytes().length;
+ }
+ } catch (Exception e) {
+ st.clearParameters();
+ invokeDirty(rowData, e);
+ }
+ }
+ batch.clear();
+ st.clearParameters();
+ }
+
+ private void invokeDirty(RowData rowData, Exception e) {
+ if (!multipleSink) {
+ if (dirtySinkHelper != null) {
+ dirtySinkHelper.invoke(rowData.toString(),
DirtyType.BATCH_LOAD_ERROR, e);
+ }
+ sinkMetricData.invokeDirty(1,
rowData.toString().getBytes().length);
+ } else {
+ if (dirtySinkHelper != null) {
+ dirtySinkHelper.invoke(rowData.toString(),
DirtyType.BATCH_LOAD_ERROR, label, logtag, identifier, e);
+ }
+ metric[2] += 1;
+ metric[3] += rowData.toString().getBytes().length;
+ }
+ }
+
+ private List<Integer> parseError(SQLException e) throws SQLException {
+ List<Integer> errors = new ArrayList<>();
+ int pos = getPosFromMessage(e.getMessage());
+ if (pos != -1) {
+ errors.add(getPosFromMessage(e.getMessage()));
+ } else {
+ throw new SQLException(e);
+ }
+ SQLException next = e.getNextException();
+ if (next != null) {
+ errors.addAll(parseError(next));
+ }
+ return errors;
+ }
+
+ private int getPosFromMessage(String message) {
+ Matcher matcher = pattern.matcher(message);
+ if (matcher.find()) {
+ int pos = Integer.parseInt(matcher.group(1));
+ // duplicate key is a special caseļ¼can't just return the first
instance
+ if (message.contains("duplicate key")) {
+ return -1;
+ }
+ return pos;
+ }
+ LOG.error("The dirty message {} can't be parsed", message);
+ return -1;
+ }
+
+ @Override
+ public void closeStatements() throws SQLException {
+ if (st != null) {
+ st.close();
+ st = null;
+ }
+ }
+}