This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 02adb3dd6 [Improvement] avoid errors in batch insert caused by
different tables or columns (#2919)
02adb3dd6 is described below
commit 02adb3dd638517e89035767540ef30bd5ae1bcae
Author: lenonhere <[email protected]>
AuthorDate: Tue Aug 8 09:18:57 2023 +0800
[Improvement] avoid errors in batch insert caused by different tables or
columns (#2919)
* avoid errors in batch insert caused by different tables or columns
Refactor sinkRequest.sqlStatement to avoid errors in batch insert: 1) Group
and combine statements by the part before ‘values’ in insert statements. 2)
Handle different tables or columns that may cause errors in batch insert.
* Update SinkRequest.scala
add logger and refine warning log
---
.../flink/connector/failover/SinkRequest.scala | 38 ++++++++++++----------
1 file changed, 21 insertions(+), 17 deletions(-)
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/failover/SinkRequest.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/failover/SinkRequest.scala
index 37f94b013..18140f088 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/failover/SinkRequest.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/failover/SinkRequest.scala
@@ -17,12 +17,14 @@
package org.apache.streampark.flink.connector.failover
+import org.apache.streampark.common.util.Logger
+
import java.util
import java.util.regex.Pattern
import scala.collection.convert.ImplicitConversions._
-case class SinkRequest(records: util.List[String], var attemptCounter: Int =
0) {
+case class SinkRequest(records: util.List[String], var attemptCounter: Int =
0) extends Logger {
def incrementCounter(): Unit = attemptCounter += 1
def size: Int = records.size()
@@ -34,22 +36,24 @@ case class SinkRequest(records: util.List[String], var
attemptCounter: Int = 0)
Pattern.compile("^(.*)\\s+(values|value)(.*)", Pattern.CASE_INSENSITIVE)
lazy val sqlStatement: String = {
- val matcher = INSERT_REGEXP.matcher(records.head)
- if (!matcher.find()) null;
- else {
- val prefix = matcher.group(1)
- val values = records
- .map(
- x => {
- val valueMatcher = INSERT_REGEXP.matcher(x)
- if (valueMatcher.find()) {
- valueMatcher.group(3)
- } else {
- null
- }
- })
- .mkString(",")
- s"$prefix VALUES $values"
+ val prefixMap: Map[String, List[String]] = Map[String, List[String]]()
+ records.foreach(
+ x => {
+ val valueMatcher = INSERT_REGEXP.matcher(x)
+ if (valueMatcher.find()) {
+ val prefix = valueMatcher.group(1)
+ prefixMap.get(prefix) match {
+ case Some(value) => value.add(valueMatcher.group(3))
+ case None => prefixMap.put(prefix, List(valueMatcher.group(3)))
+ }
+ } else {
+ logWarn(s"ignore record: $x")
+ }
+ })
+
+ prefixMap.size match {
+ case _ => prefixMap.map((m) => s"""${m._1} VALUES
${m._2.mkString(",")}""").mkString(";")
+ case 0 => null
}
}