This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new aa320d163 [Improvement] add support for SinkRequest that has multiple
statements (#3004)
aa320d163 is described below
commit aa320d163a536f7d62ad58eef52db66a7ececad4
Author: lenonhere <[email protected]>
AuthorDate: Tue Sep 12 13:07:44 2023 +0800
[Improvement] add support for SinkRequest that has multiple statements
(#3004)
* add support for SinkRequest that has multiple statement
* Update imports of ClickHouseWriterTask.scala
* add unit test for SinkRequest
* add Apache license
---------
Co-authored-by: lenon <[email protected]>
---
.../flink/connector/failover/SinkRequest.scala | 30 +++++++++-----
.../flink/connector/failover/SinkRequestTest.scala | 48 ++++++++++++++++++++++
.../clickhouse/internal/ClickHouseWriterTask.scala | 48 ++++++++++++++--------
3 files changed, 99 insertions(+), 27 deletions(-)
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/failover/SinkRequest.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/failover/SinkRequest.scala
index 9593d6c7c..7f5fc5c9c 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/failover/SinkRequest.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/failover/SinkRequest.scala
@@ -22,7 +22,9 @@ import org.apache.streampark.common.util.Logger
import java.util
import java.util.regex.Pattern
-import scala.collection.convert.ImplicitConversions._
+import scala.collection.convert.ImplicitConversions.`collection
AsScalaIterable`
+import scala.collection.mutable
+import scala.collection.mutable.ListBuffer
case class SinkRequest(records: util.List[String], var attemptCounter: Int =
0) extends Logger {
def incrementCounter(): Unit = attemptCounter += 1
@@ -35,26 +37,34 @@ case class SinkRequest(records: util.List[String], var
attemptCounter: Int = 0)
private[this] lazy val INSERT_REGEXP =
Pattern.compile("^(.*?)\\s+(values|value)(.*)", Pattern.CASE_INSENSITIVE)
- lazy val sqlStatement: String = {
- val prefixMap: Map[String, List[String]] = Map[String, List[String]]()
- records.foreach(
+ lazy val sqlStatement: List[String] = {
+ var result: List[String] = List.empty[String]
+ val prefixMap: mutable.Map[String, ListBuffer[String]] =
+ mutable.Map.empty[String, ListBuffer[String]]
+
+ records.forEach(
x => {
+ // group statements by the part before 'value(s)' in insert statements.
val valueMatcher = INSERT_REGEXP.matcher(x)
if (valueMatcher.find()) {
val prefix = valueMatcher.group(1)
prefixMap.get(prefix) match {
- case Some(value) => value.add(valueMatcher.group(3))
- case None => prefixMap.put(prefix, List(valueMatcher.group(3)))
+ case Some(value) => value += valueMatcher.group(3)
+ case None => prefixMap(prefix) = ListBuffer(valueMatcher.group(3))
}
} else {
+ // other statements will be ignored.
logWarn(s"ignore record: $x")
}
})
-
- prefixMap.size match {
- case _ => prefixMap.map((m) => s"""${m._1} VALUES
${m._2.mkString(",")}""").mkString(";")
- case 0 => null
+ if (prefixMap.nonEmpty) {
+ // combine statements by the part before 'value(s)' in insert statements.
+ result = prefixMap.map(m => s"""${m._1} VALUES
${m._2.mkString(",")}""").toList
}
+
+ logDebug(s"script to commit: ${result.mkString(";")}")
+
+ result
}
lazy val table: String = {
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/test/scala/org/apache/streampark/flink/connector/failover/SinkRequestTest.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/test/scala/org/apache/streampark/flink/connector/failover/SinkRequestTest.scala
new file mode 100644
index 000000000..f1807b667
--- /dev/null
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/test/scala/org/apache/streampark/flink/connector/failover/SinkRequestTest.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.flink.connector.failover
+
+import org.junit.jupiter.api.{Assertions, Test}
+
+import scala.collection.JavaConverters._
+
+class SinkRequestTest {
+ @Test
+ def sqlStatement(): Unit = {
+ // input statements
+ val statementsList = List(
+ "insert into table_1(col1, col2) values(1, 2)",
+ "insert into table_1(col1, col2) values(11, 22)",
+ "insert into table_1(col1, col2, col3) values(11, 22, 33)",
+ "insert into table_2(col1, col2, col3) values(11, 22, 33)",
+ )
+
+ val sinkRequest = SinkRequest(statementsList.asJava)
+
+ // expected result
+ val expectedSqlStatement = List(
+ "insert into table_2(col1, col2, col3) VALUES (11, 22, 33)",
+ "insert into table_1(col1, col2) VALUES (1, 2),(11, 22)",
+ "insert into table_1(col1, col2, col3) VALUES (11, 22, 33)",
+ )
+
+ // comparison of result should be based on Set, that is, there is no need
to care about the order of elements
+ Assertions.assertTrue(sinkRequest.sqlStatement.toSet ==
expectedSqlStatement.toSet)
+
+ }
+}
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/internal/ClickHouseWriterTask.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/internal/ClickHouseWriterTask.scala
index 0b9ca1054..c6afc284e 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/internal/ClickHouseWriterTask.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/internal/ClickHouseWriterTask.scala
@@ -21,7 +21,7 @@ import org.apache.streampark.common.util.Logger
import
org.apache.streampark.flink.connector.clickhouse.conf.ClickHouseHttpConfig
import org.apache.streampark.flink.connector.failover.{FailoverWriter,
SinkRequest}
-import io.netty.handler.codec.http.{HttpHeaderNames, HttpHeaders}
+import io.netty.handler.codec.http.HttpHeaderNames
import org.asynchttpclient.{AsyncHttpClient, ListenableFuture, Request,
Response}
import java.util.concurrent.{BlockingQueue, ExecutorService, TimeUnit}
@@ -63,24 +63,38 @@ case class ClickHouseWriterTask(
}
def send(sinkRequest: SinkRequest): Unit = {
- val request = buildRequest(sinkRequest)
- logDebug(s"Ready to load data to ${sinkRequest.table}, size:
${sinkRequest.size}")
- val whenResponse = asyncHttpClient.executeRequest(request)
- val callback = respCallback(whenResponse, sinkRequest)
- whenResponse.addListener(callback, callbackService)
+ // ClickHouse's http API does not accept EMPTY request body
+ if (sinkRequest.sqlStatement == null || sinkRequest.sqlStatement.isEmpty) {
+ logWarn(s"Skip empty sql statement")
+ return
+ }
+
+ val requests = buildRequest(sinkRequest)
+ requests.foreach(
+ request => {
+ logDebug(s"Ready to fire request: $request")
+ val whenResponse = asyncHttpClient.executeRequest(request)
+ val callback = respCallback(whenResponse, sinkRequest)
+ whenResponse.addListener(callback, callbackService)
+ })
}
- def buildRequest(sinkRequest: SinkRequest): Request = {
- val host = clickHouseConf.getRandomHostUrl
- val builder = asyncHttpClient
- .preparePost(host)
- .setRequestTimeout(clickHouseConf.timeout)
- .setHeader(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=utf-8")
- .setBody(sinkRequest.sqlStatement)
- if (clickHouseConf.credentials != null) {
- builder.setHeader(HttpHeaderNames.AUTHORIZATION, "Basic " +
clickHouseConf.credentials)
- }
- builder.build
+ private def buildRequest(sinkRequest: SinkRequest): List[Request] = {
+ logDebug(s"There is [${sinkRequest.sqlStatement.size}] statement(s) in
SinkRequest ")
+ // ClickHouse's http API does not accept multiple statements, so requests
should be built by splitting statements
+ sinkRequest.sqlStatement.filter(_.nonEmpty).map(
+ statement => {
+ val host = clickHouseConf.getRandomHostUrl
+ val builder = asyncHttpClient
+ .preparePost(host)
+ .setRequestTimeout(clickHouseConf.timeout)
+ .setHeader(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=utf-8")
+ .setBody(statement)
+ if (clickHouseConf.credentials != null) {
+ builder.setHeader(HttpHeaderNames.AUTHORIZATION, "Basic " +
clickHouseConf.credentials)
+ }
+ builder.build
+ })
}
def respCallback(whenResponse: ListenableFuture[Response], sinkRequest:
SinkRequest): Runnable =