This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 5e0ce99f6 Extract common const PARAM_PREFIX (#2874)
5e0ce99f6 is described below
commit 5e0ce99f623f2b7db527112174f8cc01c1eda16c
Author: ChengJie1053 <[email protected]>
AuthorDate: Fri Jul 21 23:22:00 2023 +0800
Extract common const PARAM_PREFIX (#2874)
* Extract common const PARAM_PREFIX
---
.../scala/org/apache/streampark/common/conf/ConfigConst.scala | 2 ++
.../streampark/flink/client/trait/FlinkClientTrait.scala | 10 +++++-----
.../org/apache/streampark/flink/core/SqlCommandParser.scala | 7 ++++---
.../streampark/flink/core/test/FlinkSqlExecuteFunSuite.scala | 4 ++--
.../main/scala/org/apache/streampark/spark/core/Spark.scala | 2 +-
5 files changed, 14 insertions(+), 11 deletions(-)
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
index 6be8accb2..9f39a914c 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
@@ -24,6 +24,8 @@ object ConfigConst {
val DEFAULT_DATAMASK_STRING = "********"
+ val PARAM_PREFIX = "--"
+
/** about parameter... */
val KEY_APP_HOME = "app.home"
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 670d323da..bc10ff072 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -52,11 +52,11 @@ import scala.util.{Failure, Success, Try}
trait FlinkClientTrait extends Logger {
- private[client] lazy val PARAM_KEY_FLINK_CONF = KEY_FLINK_CONF("--")
- private[client] lazy val PARAM_KEY_FLINK_SQL = KEY_FLINK_SQL("--")
- private[client] lazy val PARAM_KEY_APP_CONF = KEY_APP_CONF("--")
- private[client] lazy val PARAM_KEY_APP_NAME = KEY_APP_NAME("--")
- private[client] lazy val PARAM_KEY_FLINK_PARALLELISM =
KEY_FLINK_PARALLELISM("--")
+ private[client] lazy val PARAM_KEY_FLINK_CONF = KEY_FLINK_CONF(PARAM_PREFIX)
+ private[client] lazy val PARAM_KEY_FLINK_SQL = KEY_FLINK_SQL(PARAM_PREFIX)
+ private[client] lazy val PARAM_KEY_APP_CONF = KEY_APP_CONF(PARAM_PREFIX)
+ private[client] lazy val PARAM_KEY_APP_NAME = KEY_APP_NAME(PARAM_PREFIX)
+ private[client] lazy val PARAM_KEY_FLINK_PARALLELISM =
KEY_FLINK_PARALLELISM(PARAM_PREFIX)
@throws[Exception]
def submit(submitRequest: SubmitRequest): SubmitResponse = {
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
index e77bf5349..492e6e9cf 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
@@ -16,6 +16,7 @@
*/
package org.apache.streampark.flink.core
+import org.apache.streampark.common.conf.ConfigConst.PARAM_PREFIX
import org.apache.streampark.common.enums.FlinkSqlValidationFailedType
import org.apache.streampark.common.util.Logger
@@ -417,7 +418,7 @@ case class SqlSegment(start: Int, end: Int, sql: String)
object SqlSplitter {
- private lazy val singleLineCommentPrefixList = Set[String]("--")
+ private lazy val singleLineCommentPrefixList = Set[String](PARAM_PREFIX)
/**
* Split whole text into multiple sql statements. Two Steps: Step 1, split
the whole text into
@@ -453,7 +454,7 @@ object SqlSplitter {
while (scanner.hasNextLine) {
lineNumber += 1
val line = scanner.nextLine().trim
- val nonEmpty = line.nonEmpty && !line.startsWith("--")
+ val nonEmpty = line.nonEmpty && !line.startsWith(PARAM_PREFIX)
if (line.startsWith("/*")) {
startComment = true
hasComment = true
@@ -618,7 +619,7 @@ object SqlSplitter {
builder.toString
}
- private[this] def isSingleLineComment(text: String) =
text.trim.startsWith("--")
+ private[this] def isSingleLineComment(text: String) =
text.trim.startsWith(PARAM_PREFIX)
private[this] def isMultipleLineComment(text: String) =
text.trim.startsWith("/*") && text.trim.endsWith("*/")
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-test/src/test/scala/org/apache/streampark/flink/core/test/FlinkSqlExecuteFunSuite.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-test/src/test/scala/org/apache/streampark/flink/core/test/FlinkSqlExecuteFunSuite.scala
index e559c52c6..a681018a3 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-test/src/test/scala/org/apache/streampark/flink/core/test/FlinkSqlExecuteFunSuite.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-test/src/test/scala/org/apache/streampark/flink/core/test/FlinkSqlExecuteFunSuite.scala
@@ -16,7 +16,7 @@
*/
package org.apache.streampark.flink.core.test
-import org.apache.streampark.common.conf.ConfigConst.KEY_FLINK_SQL
+import org.apache.streampark.common.conf.ConfigConst.{KEY_FLINK_SQL,
PARAM_PREFIX}
import org.apache.streampark.common.util.DeflaterUtils
import org.apache.streampark.flink.core.{FlinkSqlExecutor,
FlinkTableInitializer, StreamTableContext}
@@ -28,7 +28,7 @@ import scala.collection.mutable.ArrayBuffer
class FlinkSqlExecuteFunSuite extends AnyFunSuite {
def execute(sql: String)(implicit func: String => Unit): Unit = {
- val args = ArrayBuffer(KEY_FLINK_SQL("--"),
DeflaterUtils.zipString(sql.stripMargin))
+ val args = ArrayBuffer(KEY_FLINK_SQL(PARAM_PREFIX),
DeflaterUtils.zipString(sql.stripMargin))
val context = new
StreamTableContext(FlinkTableInitializer.initialize(args.toArray, null, null))
FlinkSqlExecutor.executeSql(KEY_FLINK_SQL(), context.parameter, context)
}
diff --git
a/streampark-spark/streampark-spark-core/src/main/scala/org/apache/streampark/spark/core/Spark.scala
b/streampark-spark/streampark-spark-core/src/main/scala/org/apache/streampark/spark/core/Spark.scala
index 186317fb8..83aec63d0 100644
---
a/streampark-spark/streampark-spark-core/src/main/scala/org/apache/streampark/spark/core/Spark.scala
+++
b/streampark-spark/streampark-spark-core/src/main/scala/org/apache/streampark/spark/core/Spark.scala
@@ -101,7 +101,7 @@ trait Spark extends Logger {
createOnError = value.toBoolean
argv = tail
case Nil =>
- case other :: value :: tail if other.startsWith("--") =>
+ case other :: value :: tail if other.startsWith(PARAM_PREFIX) =>
userArgs += other.drop(2) -> value
argv = tail
case tail =>