This is an automated email from the ASF dual-hosted git repository.

fhueske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a5a7925415b [FLINK-39651][table] REGEXP plan-time validation and 
hot-path log cleanup (#28292)
a5a7925415b is described below

commit a5a7925415bdf6f257b100cc27e9aeb19d096d04
Author: Ramin Gharib <[email protected]>
AuthorDate: Tue Jun 2 17:15:47 2026 +0200

    [FLINK-39651][table] REGEXP plan-time validation and hot-path log cleanup 
(#28292)
    
    * [FLINK-39651][table] Drop hot-path error log in REGEXP
    
    SqlFunctionUtils.regExp caught any exception from pattern compilation with 
LOG.error, producing one stack trace per record processed when an invalid regex 
was supplied (for example a column-referenced or function-built pattern that 
fails to compile).
    
    The pattern is already looked up through REGEXP_PATTERN_CACHE, so the only 
hot-path cost left was the logging. PatternSyntaxException is now caught 
silently and returns false, preserving the prior runtime contract without 
flooding the log. Other exception types are no longer swallowed.
    
    Adds runtime IT cases for REGEXP in RegexpFunctionsITCase covering literal 
valid/no-match, null input, column-ref invalid, and function-call regex paths.
    
    * [FLINK-39651][table] Route REGEXP through BridgingSqlFunction
    
    Migrates REGEXP to the modern BuiltInFunctionDefinition stack while keeping 
the existing StringCallGen codegen path.
    
    The function definition is renamed to name("REGEXP") and marked 
runtimeProvided() so it is exposed to both SQL and Table API conversion through 
CoreModule. A new arm in ExprCodeGenerator's BridgingSqlFunction block routes 
the call to StringCallGen.generateRegExp. The legacy 
FlinkSqlOperatorTable.REGEXP entry, the DirectConvertRule mapping, and the case 
REGEXP arm in StringCallGen are removed.
    
    Behavior is preserved: same arg types, same codegen call, same runtime 
helper. Only the registration path changes. The Table API and Python string 
representation now renders as REGEXP(...) to match the function name.
    
    * [FLINK-39651][table] Validate REGEXP literal patterns at plan time
    
    Adds RegexpInputTypeStrategy which validates the two STRING arguments and 
delegates the literal-pattern check to the shared 
StrategyUtils.validateLiteralPattern helper introduced for the regex function 
family. Literal regex arguments are eagerly compiled during type inference and 
surface failures as ValidationException; non-literal or null-literal arguments 
are deferred to runtime.
    
    REGEXP is wired through the new strategy via 
SpecificInputTypeStrategies.REGEXP.
    
    Tests: new RegexpInputTypeStrategyTest covers the four branches 
(non-literal defers, valid literal compiles, null literal is deferred, invalid 
literal fails). RegexpFunctionsITCase gains a plan-time validation TestSetSpec 
covering both Table API and SQL paths.
---
 .../pyflink/table/tests/test_expression.py         |  2 +-
 .../functions/BuiltInFunctionDefinitions.java      |  8 +--
 .../strategies/RegexpInputTypeStrategy.java        | 83 ++++++++++++++++++++++
 .../strategies/SpecificInputTypeStrategies.java    |  3 +
 .../strategies/RegexpInputTypeStrategyTest.java    | 57 +++++++++++++++
 .../expressions/converter/DirectConvertRule.java   |  2 -
 .../functions/sql/FlinkSqlOperatorTable.java       |  9 ---
 .../table/planner/codegen/ExprCodeGenerator.scala  |  3 +
 .../planner/codegen/calls/StringCallGen.scala      |  2 -
 .../planner/functions/RegexpFunctionsITCase.java   | 47 ++++++++++++
 .../table/runtime/functions/SqlFunctionUtils.java  | 15 ++--
 11 files changed, 207 insertions(+), 24 deletions(-)

diff --git a/flink-python/pyflink/table/tests/test_expression.py 
b/flink-python/pyflink/table/tests/test_expression.py
index e3681e0a184..2b957e7aea6 100644
--- a/flink-python/pyflink/table/tests/test_expression.py
+++ b/flink-python/pyflink/table/tests/test_expression.py
@@ -193,7 +193,7 @@ class PyFlinkBatchExpressionTests(PyFlinkTestCase):
         self.assertEqual("MAKE_VALID_UTF8(a)", str(expr1.make_valid_utf8))
 
         # regexp functions
-        self.assertEqual("regexp(a, b)", str(expr1.regexp(expr2)))
+        self.assertEqual("REGEXP(a, b)", str(expr1.regexp(expr2)))
         self.assertEqual("REGEXP_COUNT(a, b)", str(expr1.regexp_count(expr2)))
         self.assertEqual('REGEXP_EXTRACT(a, b)', 
str(expr1.regexp_extract(expr2)))
         self.assertEqual('REGEXP_EXTRACT(a, b, 3)', 
str(expr1.regexp_extract(expr2, 3)))
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index 0b9ea76b175..7025a51904f 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -1714,13 +1714,11 @@ public final class BuiltInFunctionDefinitions {
 
     public static final BuiltInFunctionDefinition REGEXP =
             BuiltInFunctionDefinition.newBuilder()
-                    .name("regexp")
+                    .name("REGEXP")
                     .kind(SCALAR)
-                    .inputTypeStrategy(
-                            sequence(
-                                    
logical(LogicalTypeFamily.CHARACTER_STRING),
-                                    
logical(LogicalTypeFamily.CHARACTER_STRING)))
+                    .inputTypeStrategy(SpecificInputTypeStrategies.REGEXP)
                     
.outputTypeStrategy(nullableIfArgs(explicit(DataTypes.BOOLEAN())))
+                    .runtimeProvided()
                     .build();
 
     public static final BuiltInFunctionDefinition REGEXP_REPLACE =
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/RegexpInputTypeStrategy.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/RegexpInputTypeStrategy.java
new file mode 100644
index 00000000000..b80a04bee6e
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/RegexpInputTypeStrategy.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.ArgumentCount;
+import org.apache.flink.table.types.inference.ArgumentTypeStrategy;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.ConstantArgumentCount;
+import org.apache.flink.table.types.inference.InputTypeStrategy;
+import org.apache.flink.table.types.inference.Signature;
+import org.apache.flink.table.types.inference.Signature.Argument;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+
+import java.util.List;
+import java.util.Optional;
+
+import static 
org.apache.flink.table.types.inference.InputTypeStrategies.logical;
+
+/**
+ * Input type strategy for {@link BuiltInFunctionDefinitions#REGEXP}. 
Validates literal regex
+ * patterns at planning time.
+ */
+@Internal
+public class RegexpInputTypeStrategy implements InputTypeStrategy {
+
+    private static final int ARG_STR = 0;
+    private static final int ARG_REGEX = 1;
+
+    private static final ArgumentTypeStrategy STRING_ARG =
+            logical(LogicalTypeFamily.CHARACTER_STRING);
+
+    @Override
+    public ArgumentCount getArgumentCount() {
+        return ConstantArgumentCount.of(2);
+    }
+
+    @Override
+    public Optional<List<DataType>> inferInputTypes(
+            final CallContext callContext, final boolean throwOnFailure) {
+        if (STRING_ARG.inferArgumentType(callContext, ARG_STR, 
throwOnFailure).isEmpty()) {
+            return Optional.empty();
+        }
+        if (STRING_ARG.inferArgumentType(callContext, ARG_REGEX, 
throwOnFailure).isEmpty()) {
+            return Optional.empty();
+        }
+
+        final Optional<List<DataType>> patternError =
+                StrategyUtils.validateLiteralPattern(callContext, ARG_REGEX, 
throwOnFailure);
+        if (patternError.isPresent()) {
+            return patternError;
+        }
+
+        return Optional.of(callContext.getArgumentDataTypes());
+    }
+
+    @Override
+    public List<Signature> getExpectedSignatures(final FunctionDefinition 
definition) {
+        return List.of(
+                Signature.of(
+                        Argument.ofGroup("str", 
LogicalTypeFamily.CHARACTER_STRING),
+                        Argument.ofGroup("regex", 
LogicalTypeFamily.CHARACTER_STRING)));
+    }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
index fc6cd13177f..dc7167a656d 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
@@ -194,6 +194,9 @@ public final class SpecificInputTypeStrategies {
     /** Type strategy for {@link BuiltInFunctionDefinitions#TO_TIMESTAMP_LTZ}. 
*/
     public static final InputTypeStrategy TO_TIMESTAMP_LTZ = new 
ToTimestampLtzInputTypeStrategy();
 
+    /** Type strategy for {@link BuiltInFunctionDefinitions#REGEXP}. */
+    public static final InputTypeStrategy REGEXP = new 
RegexpInputTypeStrategy();
+
     /** Type strategy for {@link BuiltInFunctionDefinitions#REGEXP_EXTRACT}. */
     public static final InputTypeStrategy REGEXP_EXTRACT = new 
RegexpExtractInputTypeStrategy();
 
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/RegexpInputTypeStrategyTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/RegexpInputTypeStrategyTest.java
new file mode 100644
index 00000000000..6610eee392d
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/RegexpInputTypeStrategyTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.inference.InputTypeStrategiesTestBase;
+
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.REGEXP;
+
+/** Tests for {@link RegexpInputTypeStrategy}. */
+class RegexpInputTypeStrategyTest extends InputTypeStrategiesTestBase {
+
+    @Override
+    protected Stream<TestSpec> testData() {
+        return Stream.of(
+                // Non-literal regex skips the plan-time compile check and is 
deferred to runtime.
+                TestSpec.forStrategy("Non-literal regex defers compile to 
runtime", REGEXP)
+                        .calledWithArgumentTypes(DataTypes.STRING(), 
DataTypes.STRING())
+                        .expectArgumentTypes(DataTypes.STRING(), 
DataTypes.STRING()),
+
+                // Valid literal regex compiles cleanly at plan time.
+                TestSpec.forStrategy("Valid literal regex compiles", REGEXP)
+                        .calledWithArgumentTypes(DataTypes.STRING(), 
DataTypes.STRING())
+                        .calledWithLiteralAt(1, "foo(.*?)bar")
+                        .expectArgumentTypes(DataTypes.STRING(), 
DataTypes.STRING()),
+
+                // Null literal regex short-circuits the plan-time check; 
runtime returns null.
+                TestSpec.forStrategy("Null regex literal is deferred", REGEXP)
+                        .calledWithArgumentTypes(DataTypes.STRING(), 
DataTypes.STRING())
+                        .calledWithLiteralAt(1, null)
+                        .expectArgumentTypes(DataTypes.STRING(), 
DataTypes.STRING()),
+
+                // Invalid literal regex surfaces as a ValidationException at 
plan time.
+                TestSpec.forStrategy("Invalid literal regex fails at plan 
time", REGEXP)
+                        .calledWithArgumentTypes(DataTypes.STRING(), 
DataTypes.STRING())
+                        .calledWithLiteralAt(1, "(")
+                        .expectErrorMessage("Invalid regular expression for"));
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java
index 17dffb8613f..11b9545499a 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java
@@ -195,8 +195,6 @@ public class DirectConvertRule implements 
CallExpressionConvertRule {
                 BuiltInFunctionDefinitions.RTRIM, FlinkSqlOperatorTable.RTRIM);
         definitionSqlOperatorHashMap.put(
                 BuiltInFunctionDefinitions.REPEAT, 
FlinkSqlOperatorTable.REPEAT);
-        definitionSqlOperatorHashMap.put(
-                BuiltInFunctionDefinitions.REGEXP, 
FlinkSqlOperatorTable.REGEXP);
         definitionSqlOperatorHashMap.put(
                 BuiltInFunctionDefinitions.REVERSE, 
FlinkSqlOperatorTable.REVERSE);
         definitionSqlOperatorHashMap.put(
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
index f44f1806df3..7bcfd1e266e 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
@@ -597,15 +597,6 @@ public class FlinkSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
                             OperandTypes.family(SqlTypeFamily.CHARACTER, 
SqlTypeFamily.CHARACTER)),
                     SqlFunctionCategory.TIMEDATE);
 
-    public static final SqlFunction REGEXP =
-            new SqlFunction(
-                    "REGEXP",
-                    SqlKind.OTHER_FUNCTION,
-                    ReturnTypes.BOOLEAN_NULLABLE,
-                    null,
-                    OperandTypes.family(SqlTypeFamily.CHARACTER, 
SqlTypeFamily.CHARACTER),
-                    SqlFunctionCategory.STRING);
-
     public static final SqlFunction PARSE_URL =
             new SqlFunction(
                     "PARSE_URL",
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
index f07bf80bc05..b05eac28261 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
@@ -952,6 +952,9 @@ class ExprCodeGenerator(
           case BuiltInFunctionDefinitions.JSON =>
             new JsonCallGen().generate(ctx, operands, 
FlinkTypeFactory.toLogicalType(call.getType))
 
+          case BuiltInFunctionDefinitions.REGEXP =>
+            StringCallGen.generateRegExp(ctx, operands, resultType)
+
           case BuiltInFunctionDefinitions.REGEXP_EXTRACT =>
             StringCallGen.generateRegexpExtract(ctx, operands, resultType)
 
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
index 72b3ce2e036..33e335f0549 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
@@ -134,8 +134,6 @@ object StringCallGen {
 
       case CHR => generateChr(ctx, operands, returnType)
 
-      case REGEXP => generateRegExp(ctx, operands, returnType)
-
       case BIN => generateBin(ctx, operands, returnType)
 
       case CONCAT_FUNCTION =>
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/RegexpFunctionsITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/RegexpFunctionsITCase.java
index e62010bd0c1..5d920617512 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/RegexpFunctionsITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/RegexpFunctionsITCase.java
@@ -33,6 +33,7 @@ class RegexpFunctionsITCase extends BuiltInFunctionTestBase {
     @Override
     Stream<TestSetSpec> getTestSetSpecs() {
         return Stream.of(
+                        regexpTestCases(),
                         regexpCountTestCases(),
                         regexpExtractTestCases(),
                         regexpExtractAllTestCases(),
@@ -42,6 +43,52 @@ class RegexpFunctionsITCase extends BuiltInFunctionTestBase {
                 .flatMap(s -> s);
     }
 
+    private Stream<TestSetSpec> regexpTestCases() {
+        return Stream.of(
+                TestSetSpec.forFunction(BuiltInFunctionDefinitions.REGEXP)
+                        .onFieldsWithData(null, "foobar", "(")
+                        .andDataTypes(DataTypes.STRING(), DataTypes.STRING(), 
DataTypes.STRING())
+                        .testResult(
+                                $("f0").regexp("foo"),
+                                "REGEXP(f0, 'foo')",
+                                null,
+                                DataTypes.BOOLEAN().nullable())
+                        .testResult(
+                                $("f1").regexp("foo"),
+                                "REGEXP(f1, 'foo')",
+                                true,
+                                DataTypes.BOOLEAN().nullable())
+                        .testResult(
+                                $("f1").regexp("xyz"),
+                                "REGEXP(f1, 'xyz')",
+                                false,
+                                DataTypes.BOOLEAN().nullable())
+                        .testResult(
+                                $("f1").regexp($("f2")),
+                                "REGEXP(f1, f2)",
+                                false,
+                                DataTypes.BOOLEAN().nullable())
+                        .testResult(
+                                $("f1").regexp(concat("fo", "o")),
+                                "REGEXP(f1, 'fo' || 'o')",
+                                true,
+                                DataTypes.BOOLEAN().nullable())
+                        .testResult(
+                                $("f1").regexp(concat("(", "")),
+                                "REGEXP(f1, '(' || '')",
+                                false,
+                                DataTypes.BOOLEAN().nullable()),
+                TestSetSpec.forFunction(
+                                BuiltInFunctionDefinitions.REGEXP,
+                                "Invalid literal regex fails at plan time")
+                        .onFieldsWithData("foobar")
+                        .andDataTypes(DataTypes.STRING())
+                        .testTableApiValidationError(
+                                $("f0").regexp("("), "Invalid regular 
expression for REGEXP:")
+                        .testSqlValidationError(
+                                "REGEXP(f0, '(')", "Invalid regular expression 
for REGEXP:"));
+    }
+
     private Stream<TestSetSpec> regexpCountTestCases() {
         return Stream.of(
                 
TestSetSpec.forFunction(BuiltInFunctionDefinitions.REGEXP_COUNT)
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java
index 1932a6db862..54c45f0a4f1 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java
@@ -1015,14 +1015,19 @@ public class SqlFunctionUtils {
         return Math.abs(str.hashCode());
     }
 
-    public static Boolean regExp(String s, String regex) {
-        if (regex.length() == 0) {
+    /**
+     * Returns whether {@code s} contains a match for the regular expression 
{@code regex}. Literal
+     * regexes are validated at planning time by the input type strategy.
+     */
+    public static boolean regExp(String s, String regex) {
+        if (regex.isEmpty()) {
             return false;
         }
         try {
-            return (REGEXP_PATTERN_CACHE.get(regex)).matcher(s).find(0);
-        } catch (Exception e) {
-            LOG.error("Exception when compile and match regex:" + regex + " 
on: " + s, e);
+            return REGEXP_PATTERN_CACHE.get(regex).matcher(s).find(0);
+        } catch (PatternSyntaxException e) {
+            // Literals are rejected at planning time; non-literal invalid 
regex
+            // returns false to preserve the prior runtime contract.
             return false;
         }
     }

Reply via email to