This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 02adb3dd6 [Improvement] avoid errors in batch insert caused by 
different tables or columns (#2919)
02adb3dd6 is described below

commit 02adb3dd638517e89035767540ef30bd5ae1bcae
Author: lenonhere <[email protected]>
AuthorDate: Tue Aug 8 09:18:57 2023 +0800

    [Improvement] avoid errors in batch insert caused by different tables or 
columns (#2919)
    
    * avoid errors in batch insert caused by different tables or columns
    
    Refactor sinkRequest.sqlStatement to avoid errors in batch insert: 1) Group 
and combine statements by the part before ‘values’ in insert statements. 2) 
Handle different tables or columns that may cause errors in batch insert.
    
    * Update SinkRequest.scala
    
    add logger and refine warning log
---
 .../flink/connector/failover/SinkRequest.scala     | 38 ++++++++++++----------
 1 file changed, 21 insertions(+), 17 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/failover/SinkRequest.scala
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/failover/SinkRequest.scala
index 37f94b013..18140f088 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/failover/SinkRequest.scala
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/failover/SinkRequest.scala
@@ -17,12 +17,14 @@
 
 package org.apache.streampark.flink.connector.failover
 
+import org.apache.streampark.common.util.Logger
+
 import java.util
 import java.util.regex.Pattern
 
 import scala.collection.convert.ImplicitConversions._
 
-case class SinkRequest(records: util.List[String], var attemptCounter: Int = 
0) {
+case class SinkRequest(records: util.List[String], var attemptCounter: Int = 
0) extends Logger {
   def incrementCounter(): Unit = attemptCounter += 1
 
   def size: Int = records.size()
@@ -34,22 +36,24 @@ case class SinkRequest(records: util.List[String], var 
attemptCounter: Int = 0)
     Pattern.compile("^(.*)\\s+(values|value)(.*)", Pattern.CASE_INSENSITIVE)
 
   lazy val sqlStatement: String = {
-    val matcher = INSERT_REGEXP.matcher(records.head)
-    if (!matcher.find()) null;
-    else {
-      val prefix = matcher.group(1)
-      val values = records
-        .map(
-          x => {
-            val valueMatcher = INSERT_REGEXP.matcher(x)
-            if (valueMatcher.find()) {
-              valueMatcher.group(3)
-            } else {
-              null
-            }
-          })
-        .mkString(",")
-      s"$prefix VALUES $values"
+    val prefixMap: Map[String, List[String]] = Map[String, List[String]]()
+    records.foreach(
+      x => {
+        val valueMatcher = INSERT_REGEXP.matcher(x)
+        if (valueMatcher.find()) {
+          val prefix = valueMatcher.group(1)
+          prefixMap.get(prefix) match {
+            case Some(value) => value.add(valueMatcher.group(3))
+            case None => prefixMap.put(prefix, List(valueMatcher.group(3)))
+          }
+        } else {
+          logWarn(s"ignore record: $x")
+        }
+      })
+
+    prefixMap.size match {
+      case _ => prefixMap.map((m) => s"""${m._1} VALUES 
${m._2.mkString(",")}""").mkString(";")
+      case 0 => null
     }
   }
 

Reply via email to