This is an automated email from the ASF dual-hosted git repository.
pacinogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new f8d3989ab [INLONG-7075][Sort] Add table level metric and dirty data
backup for PostgreSQL (#7088)
f8d3989ab is described below
commit f8d3989ab29256dc34f5b5bca92c2a2979443c96
Author: kuansix <[email protected]>
AuthorDate: Fri Dec 30 19:23:25 2022 +0800
[INLONG-7075][Sort] Add table level metric and dirty data backup for
PostgreSQL (#7088)
---
.../apache/inlong/sort/base/dirty/DirtyType.java | 4 +
.../internal/JdbcMultiBatchingOutputFormat.java | 98 ++++++++++++++--------
.../jdbc/table/JdbcDynamicOutputFormatBuilder.java | 5 +-
3 files changed, 69 insertions(+), 38 deletions(-)
diff --git
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyType.java
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyType.java
index 89789872a..d6bf10e61 100644
---
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyType.java
+++
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyType.java
@@ -68,6 +68,10 @@ public enum DirtyType {
* Batch load error
*/
BATCH_LOAD_ERROR("BatchLoadError"),
+ /**
+ * Retry load error
+ */
+ RETRY_LOAD_ERROR("RetryLoadError"),
/**
* Unsupported data type
*/
diff --git
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java
index 1176ca6f4..dd4be4d5c 100644
---
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java
+++
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java
@@ -42,11 +42,13 @@ import
org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
+import org.apache.inlong.sort.base.dirty.DirtySinkHelper;
+import org.apache.inlong.sort.base.dirty.DirtyType;
import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
import org.apache.inlong.sort.base.format.JsonDynamicSchemaFormat;
import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.MetricState;
-import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.metric.sub.SinkTableMetricData;
import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
import org.apache.inlong.sort.base.util.MetricStateUtils;
import org.apache.inlong.sort.jdbc.table.AbstractJdbcDialect;
@@ -82,6 +84,8 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS_OUT;
import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
@@ -113,7 +117,7 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn,
JdbcExec extends JdbcBatc
private transient Map<String, List<String>> pkNameMap = new HashMap<>();
private transient Map<String, List<GenericRowData>> recordsMap = new
HashMap<>();
private transient Map<String, Exception> tableExceptionMap = new
HashMap<>();
- private transient Boolean isIgnoreTableException;
+ private transient Boolean stopWritingWhenTableException;
private transient ListState<MetricState> metricStateListState;
private final String sinkMultipleFormat;
@@ -121,10 +125,9 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn,
JdbcExec extends JdbcBatc
private final String tablePattern;
private final String schemaPattern;
private transient MetricState metricState;
- private SinkMetricData sinkMetricData;
- private Long dataSize = 0L;
- private Long rowSize = 0L;
+ private SinkTableMetricData sinkMetricData;
private final SchemaUpdateExceptionPolicy schemaUpdateExceptionPolicy;
+ private final DirtySinkHelper<Object> dirtySinkHelper;
private static final DateTimeFormatter
SQL_TIMESTAMP_WITH_LOCAL_TIMEZONE_FORMAT;
private static final DateTimeFormatter SQL_TIME_FORMAT;
@@ -149,7 +152,8 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn,
JdbcExec extends JdbcBatc
String schemaPattern,
String inlongMetric,
String auditHostAndPorts,
- SchemaUpdateExceptionPolicy schemaUpdateExceptionPolicy) {
+ SchemaUpdateExceptionPolicy schemaUpdateExceptionPolicy,
+ DirtySinkHelper<Object> dirtySinkHelper) {
super(connectionProvider);
this.executionOptions = checkNotNull(executionOptions);
this.dmlOptions = dmlOptions;
@@ -162,6 +166,7 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn,
JdbcExec extends JdbcBatc
this.inlongMetric = inlongMetric;
this.auditHostAndPorts = auditHostAndPorts;
this.schemaUpdateExceptionPolicy = schemaUpdateExceptionPolicy;
+ this.dirtySinkHelper = dirtySinkHelper;
}
/**
@@ -177,10 +182,13 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn,
JdbcExec extends JdbcBatc
.withInlongAudit(auditHostAndPorts)
.withInitRecords(metricState != null ?
metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
.withInitBytes(metricState != null ?
metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
+ .withInitDirtyRecords(metricState != null ?
metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L)
+ .withInitDirtyBytes(metricState != null ?
metricState.getMetricValue(DIRTY_BYTES_OUT) : 0L)
.withRegisterMetric(MetricOption.RegisteredMetric.ALL)
.build();
if (metricOption != null) {
- sinkMetricData = new SinkMetricData(metricOption,
runtimeContext.getMetricGroup());
+ sinkMetricData = new SinkTableMetricData(metricOption,
runtimeContext.getMetricGroup());
+ sinkMetricData.registerSubMetricsGroup(metricState);
}
jdbcExecMap = new HashMap<>();
connectionExecProviderMap = new HashMap<>();
@@ -188,8 +196,9 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn,
JdbcExec extends JdbcBatc
rowTypeMap = new HashMap<>();
recordsMap = new HashMap<>();
tableExceptionMap = new HashMap<>();
- isIgnoreTableException =
schemaUpdateExceptionPolicy.equals(SchemaUpdateExceptionPolicy.ALERT_WITH_IGNORE)
- ||
schemaUpdateExceptionPolicy.equals(SchemaUpdateExceptionPolicy.STOP_PARTIAL);
+ stopWritingWhenTableException =
+
schemaUpdateExceptionPolicy.equals(SchemaUpdateExceptionPolicy.ALERT_WITH_IGNORE)
+ ||
schemaUpdateExceptionPolicy.equals(SchemaUpdateExceptionPolicy.STOP_PARTIAL);
if (executionOptions.getBatchIntervalMs() != 0 &&
executionOptions.getBatchSize() != 1) {
this.scheduler =
Executors.newScheduledThreadPool(
@@ -201,15 +210,8 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn,
JdbcExec extends JdbcBatc
if (!closed) {
try {
flush();
- if (sinkMetricData != null) {
- sinkMetricData.invoke(rowSize,
dataSize);
- }
- resetStateAfterFlush();
} catch (Exception e) {
- if (sinkMetricData != null) {
-
sinkMetricData.invokeDirty(rowSize, dataSize);
- }
- resetStateAfterFlush();
+ LOG.info("Synchronized flush get
Exception:", e);
}
}
}
@@ -346,8 +348,6 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn,
JdbcExec extends JdbcBatc
LOG.info("Cal tableIdentifier get Exception:", e);
return;
}
- rowSize++;
- dataSize = dataSize +
rootNode.toString().getBytes(StandardCharsets.UTF_8).length;
GenericRowData record = null;
try {
@@ -380,16 +380,8 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn,
JdbcExec extends JdbcBatc
if (executionOptions.getBatchSize() > 0
&& batchCount >= executionOptions.getBatchSize()) {
flush();
- if (sinkMetricData != null) {
- sinkMetricData.invoke(rowSize, dataSize);
- }
- resetStateAfterFlush();
}
} catch (Exception e) {
- if (sinkMetricData != null) {
- sinkMetricData.invokeDirty(rowSize, dataSize);
- }
- resetStateAfterFlush();
throw new IOException("Writing records to JDBC failed.", e);
}
}
@@ -465,11 +457,6 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn,
JdbcExec extends JdbcBatc
return record;
}
- private void resetStateAfterFlush() {
- dataSize = 0L;
- rowSize = 0L;
- }
-
@Override
public void snapshotState(FunctionSnapshotContext context) throws
Exception {
if (sinkMetricData != null && metricStateListState != null) {
@@ -509,9 +496,9 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn,
JdbcExec extends JdbcBatc
protected void attemptFlush() throws IOException {
for (Map.Entry<String, List<GenericRowData>> entry :
recordsMap.entrySet()) {
String tableIdentifier = entry.getKey();
- boolean isIgnoreTableIdentifierException = isIgnoreTableException
+ boolean stopTableIdentifierWhenException =
stopWritingWhenTableException
&& (null != tableExceptionMap.get(tableIdentifier));
- if (isIgnoreTableIdentifierException) {
+ if (stopTableIdentifierWhenException) {
continue;
}
List<GenericRowData> tableIdRecordList = entry.getValue();
@@ -523,11 +510,15 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn,
JdbcExec extends JdbcBatc
Exception tableException = null;
try {
jdbcStatementExecutor =
getOrCreateStatementExecutor(tableIdentifier);
+ Long totalDataSize = 0L;
for (GenericRowData record : tableIdRecordList) {
+ totalDataSize = totalDataSize +
record.toString().getBytes(StandardCharsets.UTF_8).length;
jdbcStatementExecutor.addToBatch((JdbcIn) record);
}
jdbcStatementExecutor.executeBatch();
flushFlag = true;
+ outputMetrics(tableIdentifier,
Long.valueOf(tableIdRecordList.size()),
+ totalDataSize, false);
} catch (Exception e) {
tableException = e;
LOG.warn("Flush all data for tableIdentifier:{} get err:",
tableIdentifier, e);
@@ -549,10 +540,13 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn,
JdbcExec extends JdbcBatc
jdbcStatementExecutor =
getOrCreateStatementExecutor(tableIdentifier);
jdbcStatementExecutor.addToBatch((JdbcIn) record);
jdbcStatementExecutor.executeBatch();
+ Long totalDataSize =
+
Long.valueOf(record.toString().getBytes(StandardCharsets.UTF_8).length);
+ outputMetrics(tableIdentifier, 1L, totalDataSize,
false);
flushFlag = true;
break;
} catch (Exception e) {
- LOG.error("Flush one record tableIdentifier:{}
,retryTimes:{} get err:",
+ LOG.warn("Flush one record tableIdentifier:{}
,retryTimes:{} get err:",
tableIdentifier, retryTimes, e);
getAndSetPkFromErrMsg(e.getMessage(),
tableIdentifier);
tableException = e;
@@ -569,11 +563,16 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn,
JdbcExec extends JdbcBatc
if (!flushFlag && null != tableException) {
LOG.info("Put tableIdentifier:{} exception:{}",
tableIdentifier, tableException.getMessage());
+ outputMetrics(tableIdentifier,
Long.valueOf(tableIdRecordList.size()),
+ 1L, true);
+ if
(!schemaUpdateExceptionPolicy.equals(SchemaUpdateExceptionPolicy.THROW_WITH_STOP))
{
+ dirtySinkHelper.invoke(record,
DirtyType.RETRY_LOAD_ERROR, tableException);
+ }
tableExceptionMap.put(tableIdentifier, tableException);
- if (isIgnoreTableException) {
+ if (stopWritingWhenTableException) {
LOG.info("Stop write table:{} because occur
exception",
tableIdentifier);
- continue;
+ break;
}
}
}
@@ -582,6 +581,31 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn,
JdbcExec extends JdbcBatc
}
}
+ /**
+ * Output metrics with estimate for pg or other type jdbc connectors.
+ * tableIdentifier maybe: ${dbName}.${tbName} or
${dbName}.${schemaName}.${tbName}
+ */
+ private void outputMetrics(String tableIdentifier, Long rowSize, Long
dataSize, boolean dirtyFlag) {
+ String[] fieldArray = tableIdentifier.split("\\.");
+ if (fieldArray.length == 3) {
+ if (dirtyFlag) {
+ sinkMetricData.outputDirtyMetrics(fieldArray[0],
fieldArray[1], fieldArray[2],
+ rowSize, dataSize);
+ } else {
+ sinkMetricData.outputMetrics(fieldArray[0], fieldArray[1],
fieldArray[2],
+ rowSize, dataSize);
+ }
+ } else if (fieldArray.length == 2) {
+ if (dirtyFlag) {
+ sinkMetricData.outputDirtyMetrics(fieldArray[0], null,
fieldArray[1],
+ rowSize, dataSize);
+ } else {
+ sinkMetricData.outputMetrics(fieldArray[0], null,
fieldArray[1],
+ rowSize, dataSize);
+ }
+ }
+ }
+
/**
* Executes prepared statement and closes all resources of this instance.
*/
diff --git
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java
index 531864cd4..ab519a03c 100644
---
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java
+++
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/table/JdbcDynamicOutputFormatBuilder.java
@@ -37,6 +37,7 @@ import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.inlong.sort.base.dirty.DirtySinkHelper;
import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
import org.apache.inlong.sort.base.dirty.DirtyOptions;
import org.apache.inlong.sort.base.dirty.sink.DirtySink;
@@ -336,6 +337,7 @@ public class JdbcDynamicOutputFormatBuilder implements
Serializable {
checkNotNull(jdbcOptions, "jdbc options can not be null");
checkNotNull(dmlOptions, "jdbc dml options can not be null");
checkNotNull(executionOptions, "jdbc execution options can not be
null");
+ final DirtySinkHelper<Object> dirtySinkHelper = new
DirtySinkHelper<>(dirtyOptions, dirtySink);
return new JdbcMultiBatchingOutputFormat<>(
new SimpleJdbcConnectionProvider(jdbcOptions),
executionOptions,
@@ -348,6 +350,7 @@ public class JdbcDynamicOutputFormatBuilder implements
Serializable {
schemaPattern,
inlongMetric,
auditHostAndPorts,
- schemaUpdateExceptionPolicy);
+ schemaUpdateExceptionPolicy,
+ dirtySinkHelper);
}
}
\ No newline at end of file