This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new aa320d163 [Improvement] add support for SinkRequest that has multiple 
statements (#3004)
aa320d163 is described below

commit aa320d163a536f7d62ad58eef52db66a7ececad4
Author: lenonhere <[email protected]>
AuthorDate: Tue Sep 12 13:07:44 2023 +0800

    [Improvement] add support for SinkRequest that has multiple statements 
(#3004)
    
    * add support for SinkRequest that has multiple statement
    
    * Update imports of ClickHouseWriterTask.scala
    
    * add unit test for SinkRequest
    
    * add Apache license
    
    ---------
    
    Co-authored-by: lenon <[email protected]>
---
 .../flink/connector/failover/SinkRequest.scala     | 30 +++++++++-----
 .../flink/connector/failover/SinkRequestTest.scala | 48 ++++++++++++++++++++++
 .../clickhouse/internal/ClickHouseWriterTask.scala | 48 ++++++++++++++--------
 3 files changed, 99 insertions(+), 27 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/failover/SinkRequest.scala
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/failover/SinkRequest.scala
index 9593d6c7c..7f5fc5c9c 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/failover/SinkRequest.scala
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/failover/SinkRequest.scala
@@ -22,7 +22,9 @@ import org.apache.streampark.common.util.Logger
 import java.util
 import java.util.regex.Pattern
 
-import scala.collection.convert.ImplicitConversions._
+import scala.collection.convert.ImplicitConversions.`collection 
AsScalaIterable`
+import scala.collection.mutable
+import scala.collection.mutable.ListBuffer
 
 case class SinkRequest(records: util.List[String], var attemptCounter: Int = 
0) extends Logger {
   def incrementCounter(): Unit = attemptCounter += 1
@@ -35,26 +37,34 @@ case class SinkRequest(records: util.List[String], var 
attemptCounter: Int = 0)
   private[this] lazy val INSERT_REGEXP =
     Pattern.compile("^(.*?)\\s+(values|value)(.*)", Pattern.CASE_INSENSITIVE)
 
-  lazy val sqlStatement: String = {
-    val prefixMap: Map[String, List[String]] = Map[String, List[String]]()
-    records.foreach(
+  lazy val sqlStatement: List[String] = {
+    var result: List[String] = List.empty[String]
+    val prefixMap: mutable.Map[String, ListBuffer[String]] =
+      mutable.Map.empty[String, ListBuffer[String]]
+
+    records.forEach(
       x => {
+        // group statements by the part before 'value(s)' in insert statements.
         val valueMatcher = INSERT_REGEXP.matcher(x)
         if (valueMatcher.find()) {
           val prefix = valueMatcher.group(1)
           prefixMap.get(prefix) match {
-            case Some(value) => value.add(valueMatcher.group(3))
-            case None => prefixMap.put(prefix, List(valueMatcher.group(3)))
+            case Some(value) => value += valueMatcher.group(3)
+            case None => prefixMap(prefix) = ListBuffer(valueMatcher.group(3))
           }
         } else {
+          // other statements will be ignored.
           logWarn(s"ignore record: $x")
         }
       })
-
-    prefixMap.size match {
-      case _ => prefixMap.map((m) => s"""${m._1} VALUES 
${m._2.mkString(",")}""").mkString(";")
-      case 0 => null
+    if (prefixMap.nonEmpty) {
+      // combine statements by the part before 'value(s)' in insert statements.
+      result = prefixMap.map(m => s"""${m._1} VALUES 
${m._2.mkString(",")}""").toList
     }
+
+    logDebug(s"script to commit: ${result.mkString(";")}")
+
+    result
   }
 
   lazy val table: String = {
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/test/scala/org/apache/streampark/flink/connector/failover/SinkRequestTest.scala
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/test/scala/org/apache/streampark/flink/connector/failover/SinkRequestTest.scala
new file mode 100644
index 000000000..f1807b667
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/test/scala/org/apache/streampark/flink/connector/failover/SinkRequestTest.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.connector.failover
+
+import org.junit.jupiter.api.{Assertions, Test}
+
+import scala.collection.JavaConverters._
+
+class SinkRequestTest {
+  @Test
+  def sqlStatement(): Unit = {
+    // input statements
+    val statementsList = List(
+      "insert into table_1(col1, col2) values(1, 2)",
+      "insert into table_1(col1, col2) values(11, 22)",
+      "insert into table_1(col1, col2, col3) values(11, 22, 33)",
+      "insert into table_2(col1, col2, col3) values(11, 22, 33)",
+    )
+
+    val sinkRequest = SinkRequest(statementsList.asJava)
+
+    // expected result
+    val expectedSqlStatement = List(
+      "insert into table_2(col1, col2, col3) VALUES (11, 22, 33)",
+      "insert into table_1(col1, col2) VALUES (1, 2),(11, 22)",
+      "insert into table_1(col1, col2, col3) VALUES (11, 22, 33)",
+    )
+
+    // comparison of result should be based on Set, that is, there is no need 
to care about the order of elements
+    Assertions.assertTrue(sinkRequest.sqlStatement.toSet == 
expectedSqlStatement.toSet)
+
+  }
+}
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/internal/ClickHouseWriterTask.scala
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/internal/ClickHouseWriterTask.scala
index 0b9ca1054..c6afc284e 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/internal/ClickHouseWriterTask.scala
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/internal/ClickHouseWriterTask.scala
@@ -21,7 +21,7 @@ import org.apache.streampark.common.util.Logger
 import 
org.apache.streampark.flink.connector.clickhouse.conf.ClickHouseHttpConfig
 import org.apache.streampark.flink.connector.failover.{FailoverWriter, 
SinkRequest}
 
-import io.netty.handler.codec.http.{HttpHeaderNames, HttpHeaders}
+import io.netty.handler.codec.http.HttpHeaderNames
 import org.asynchttpclient.{AsyncHttpClient, ListenableFuture, Request, 
Response}
 
 import java.util.concurrent.{BlockingQueue, ExecutorService, TimeUnit}
@@ -63,24 +63,38 @@ case class ClickHouseWriterTask(
     }
 
   def send(sinkRequest: SinkRequest): Unit = {
-    val request = buildRequest(sinkRequest)
-    logDebug(s"Ready to load data to ${sinkRequest.table}, size: 
${sinkRequest.size}")
-    val whenResponse = asyncHttpClient.executeRequest(request)
-    val callback = respCallback(whenResponse, sinkRequest)
-    whenResponse.addListener(callback, callbackService)
+    // ClickHouse's http API does not accept EMPTY request body
+    if (sinkRequest.sqlStatement == null || sinkRequest.sqlStatement.isEmpty) {
+      logWarn(s"Skip empty sql statement")
+      return
+    }
+
+    val requests = buildRequest(sinkRequest)
+    requests.foreach(
+      request => {
+        logDebug(s"Ready to fire request: $request")
+        val whenResponse = asyncHttpClient.executeRequest(request)
+        val callback = respCallback(whenResponse, sinkRequest)
+        whenResponse.addListener(callback, callbackService)
+      })
   }
 
-  def buildRequest(sinkRequest: SinkRequest): Request = {
-    val host = clickHouseConf.getRandomHostUrl
-    val builder = asyncHttpClient
-      .preparePost(host)
-      .setRequestTimeout(clickHouseConf.timeout)
-      .setHeader(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=utf-8")
-      .setBody(sinkRequest.sqlStatement)
-    if (clickHouseConf.credentials != null) {
-      builder.setHeader(HttpHeaderNames.AUTHORIZATION, "Basic " + 
clickHouseConf.credentials)
-    }
-    builder.build
+  private def buildRequest(sinkRequest: SinkRequest): List[Request] = {
+    logDebug(s"There is [${sinkRequest.sqlStatement.size}] statement(s) in 
SinkRequest ")
+    // ClickHouse's http API does not accept multiple statements, so requests 
should be built by splitting statements
+    sinkRequest.sqlStatement.filter(_.nonEmpty).map(
+      statement => {
+        val host = clickHouseConf.getRandomHostUrl
+        val builder = asyncHttpClient
+          .preparePost(host)
+          .setRequestTimeout(clickHouseConf.timeout)
+          .setHeader(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=utf-8")
+          .setBody(statement)
+        if (clickHouseConf.credentials != null) {
+          builder.setHeader(HttpHeaderNames.AUTHORIZATION, "Basic " + 
clickHouseConf.credentials)
+        }
+        builder.build
+      })
   }
 
   def respCallback(whenResponse: ListenableFuture[Response], sinkRequest: 
SinkRequest): Runnable =

Reply via email to