This is an automated email from the ASF dual-hosted git repository.

AHeise pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3c7a45a61b3394790cab531205d576976ae50da7
Author: Ramin Gharib <[email protected]>
AuthorDate: Mon May 18 15:00:38 2026 +0200

    [FLINK-39650][table] Validate REGEXP_REPLACE literal patterns at plan time
    
    Adds StrategyUtils.validateLiteralPattern as a shared helper for the regex 
function family. It eagerly compiles literal regex arguments during type 
inference and surfaces failures as ValidationException via callContext.fail. 
Non-literal or null-literal arguments are deferred to runtime.
    
    Wires REGEXP_REPLACE through the new RegexpReplaceInputTypeStrategy which 
validates 3 STRING arguments and delegates the literal-pattern check to the 
shared helper. REGEXP_EXTRACT is refactored to use the same helper.
    
    Tests: new RegexpReplaceInputTypeStrategyTest covers the four branches 
(non-literal defers, valid literal compiles, null literal is deferred, invalid 
literal fails). RegexpFunctionsITCase gains a plan-time validation TestSetSpec 
for REGEXP_REPLACE covering both Table API and SQL paths.
---
 .../functions/BuiltInFunctionDefinitions.java      |  6 +--
 .../strategies/RegexpExtractInputTypeStrategy.java | 24 +-----------
 ...gy.java => RegexpReplaceInputTypeStrategy.java} | 43 ++++------------------
 .../strategies/SpecificInputTypeStrategies.java    |  3 ++
 .../types/inference/strategies/StrategyUtils.java  | 32 ++++++++++++++++
 .../RegexpExtractInputTypeStrategyTest.java        |  2 +-
 ...ava => RegexpReplaceInputTypeStrategyTest.java} | 38 ++++++++++---------
 .../planner/functions/RegexpFunctionsITCase.java   | 13 ++++++-
 8 files changed, 78 insertions(+), 83 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index cbf39763cdb..7a25ae6058a 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -1680,11 +1680,7 @@ public final class BuiltInFunctionDefinitions {
             BuiltInFunctionDefinition.newBuilder()
                     .name("REGEXP_REPLACE")
                     .kind(SCALAR)
-                    .inputTypeStrategy(
-                            sequence(
-                                    
logical(LogicalTypeFamily.CHARACTER_STRING),
-                                    
logical(LogicalTypeFamily.CHARACTER_STRING),
-                                    
logical(LogicalTypeFamily.CHARACTER_STRING)))
+                    
.inputTypeStrategy(SpecificInputTypeStrategies.REGEXP_REPLACE)
                     
.outputTypeStrategy(nullableIfArgs(explicit(DataTypes.STRING())))
                     .runtimeProvided()
                     .build();
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/RegexpExtractInputTypeStrategy.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/RegexpExtractInputTypeStrategy.java
index e95620105e7..a6330757487 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/RegexpExtractInputTypeStrategy.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/RegexpExtractInputTypeStrategy.java
@@ -34,8 +34,6 @@ import org.apache.flink.table.types.logical.LogicalTypeRoot;
 
 import java.util.List;
 import java.util.Optional;
-import java.util.regex.Pattern;
-import java.util.regex.PatternSyntaxException;
 
 import static 
org.apache.flink.table.types.inference.InputTypeStrategies.logical;
 
@@ -75,7 +73,7 @@ public class RegexpExtractInputTypeStrategy implements 
InputTypeStrategy {
         }
 
         final Optional<List<DataType>> patternError =
-                validateLiteralPattern(callContext, throwOnFailure);
+                StrategyUtils.validateLiteralPattern(callContext, ARG_REGEX, 
throwOnFailure);
         if (patternError.isPresent()) {
             return patternError;
         }
@@ -83,26 +81,6 @@ public class RegexpExtractInputTypeStrategy implements 
InputTypeStrategy {
         return Optional.of(callContext.getArgumentDataTypes());
     }
 
-    private static Optional<List<DataType>> validateLiteralPattern(
-            final CallContext callContext, final boolean throwOnFailure) {
-        if (!callContext.isArgumentLiteral(ARG_REGEX) || 
callContext.isArgumentNull(ARG_REGEX)) {
-            return Optional.empty();
-        }
-        final Optional<String> pattern = 
callContext.getArgumentValue(ARG_REGEX, String.class);
-        if (pattern.isEmpty()) {
-            return Optional.empty();
-        }
-        try {
-            Pattern.compile(pattern.get());
-            return Optional.empty();
-        } catch (PatternSyntaxException e) {
-            return callContext.fail(
-                    throwOnFailure,
-                    "Invalid regular expression for REGEXP_EXTRACT: %s",
-                    e.getMessage());
-        }
-    }
-
     @Override
     public List<Signature> getExpectedSignatures(FunctionDefinition 
definition) {
         return List.of(
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/RegexpExtractInputTypeStrategy.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/RegexpReplaceInputTypeStrategy.java
similarity index 63%
copy from 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/RegexpExtractInputTypeStrategy.java
copy to 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/RegexpReplaceInputTypeStrategy.java
index e95620105e7..3e31fb4a228 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/RegexpExtractInputTypeStrategy.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/RegexpReplaceInputTypeStrategy.java
@@ -30,33 +30,29 @@ import 
org.apache.flink.table.types.inference.InputTypeStrategy;
 import org.apache.flink.table.types.inference.Signature;
 import org.apache.flink.table.types.inference.Signature.Argument;
 import org.apache.flink.table.types.logical.LogicalTypeFamily;
-import org.apache.flink.table.types.logical.LogicalTypeRoot;
 
 import java.util.List;
 import java.util.Optional;
-import java.util.regex.Pattern;
-import java.util.regex.PatternSyntaxException;
 
 import static 
org.apache.flink.table.types.inference.InputTypeStrategies.logical;
 
 /**
- * Input type strategy for {@link BuiltInFunctionDefinitions#REGEXP_EXTRACT}. 
Validates literal
+ * Input type strategy for {@link BuiltInFunctionDefinitions#REGEXP_REPLACE}. 
Validates literal
  * regex patterns at planning time.
  */
 @Internal
-public class RegexpExtractInputTypeStrategy implements InputTypeStrategy {
+public class RegexpReplaceInputTypeStrategy implements InputTypeStrategy {
 
     private static final int ARG_STR = 0;
     private static final int ARG_REGEX = 1;
-    private static final int ARG_EXTRACT_INDEX = 2;
+    private static final int ARG_REPLACEMENT = 2;
 
     private static final ArgumentTypeStrategy STRING_ARG =
             logical(LogicalTypeFamily.CHARACTER_STRING);
-    private static final ArgumentTypeStrategy INT_ARG = 
logical(LogicalTypeRoot.INTEGER);
 
     @Override
     public ArgumentCount getArgumentCount() {
-        return ConstantArgumentCount.between(2, 3);
+        return ConstantArgumentCount.of(3);
     }
 
     @Override
@@ -68,14 +64,12 @@ public class RegexpExtractInputTypeStrategy implements 
InputTypeStrategy {
         if (STRING_ARG.inferArgumentType(callContext, ARG_REGEX, 
throwOnFailure).isEmpty()) {
             return Optional.empty();
         }
-        if (callContext.getArgumentDataTypes().size() > ARG_EXTRACT_INDEX
-                && INT_ARG.inferArgumentType(callContext, ARG_EXTRACT_INDEX, 
throwOnFailure)
-                        .isEmpty()) {
+        if (STRING_ARG.inferArgumentType(callContext, ARG_REPLACEMENT, 
throwOnFailure).isEmpty()) {
             return Optional.empty();
         }
 
         final Optional<List<DataType>> patternError =
-                validateLiteralPattern(callContext, throwOnFailure);
+                StrategyUtils.validateLiteralPattern(callContext, ARG_REGEX, 
throwOnFailure);
         if (patternError.isPresent()) {
             return patternError;
         }
@@ -83,35 +77,12 @@ public class RegexpExtractInputTypeStrategy implements 
InputTypeStrategy {
         return Optional.of(callContext.getArgumentDataTypes());
     }
 
-    private static Optional<List<DataType>> validateLiteralPattern(
-            final CallContext callContext, final boolean throwOnFailure) {
-        if (!callContext.isArgumentLiteral(ARG_REGEX) || 
callContext.isArgumentNull(ARG_REGEX)) {
-            return Optional.empty();
-        }
-        final Optional<String> pattern = 
callContext.getArgumentValue(ARG_REGEX, String.class);
-        if (pattern.isEmpty()) {
-            return Optional.empty();
-        }
-        try {
-            Pattern.compile(pattern.get());
-            return Optional.empty();
-        } catch (PatternSyntaxException e) {
-            return callContext.fail(
-                    throwOnFailure,
-                    "Invalid regular expression for REGEXP_EXTRACT: %s",
-                    e.getMessage());
-        }
-    }
-
     @Override
     public List<Signature> getExpectedSignatures(FunctionDefinition 
definition) {
         return List.of(
-                Signature.of(
-                        Argument.ofGroup("str", 
LogicalTypeFamily.CHARACTER_STRING),
-                        Argument.ofGroup("regex", 
LogicalTypeFamily.CHARACTER_STRING)),
                 Signature.of(
                         Argument.ofGroup("str", 
LogicalTypeFamily.CHARACTER_STRING),
                         Argument.ofGroup("regex", 
LogicalTypeFamily.CHARACTER_STRING),
-                        Argument.ofGroup("extractIndex", 
LogicalTypeRoot.INTEGER)));
+                        Argument.ofGroup("replacement", 
LogicalTypeFamily.CHARACTER_STRING)));
     }
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
index 81d689a550c..fc6cd13177f 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
@@ -197,6 +197,9 @@ public final class SpecificInputTypeStrategies {
     /** Type strategy for {@link BuiltInFunctionDefinitions#REGEXP_EXTRACT}. */
     public static final InputTypeStrategy REGEXP_EXTRACT = new 
RegexpExtractInputTypeStrategy();
 
+    /** Type strategy for {@link BuiltInFunctionDefinitions#REGEXP_REPLACE}. */
+    public static final InputTypeStrategy REGEXP_REPLACE = new 
RegexpReplaceInputTypeStrategy();
+
     private SpecificInputTypeStrategies() {
         // no instantiation
     }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/StrategyUtils.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/StrategyUtils.java
index 3e79975de22..bc387ccae81 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/StrategyUtils.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/StrategyUtils.java
@@ -37,8 +37,11 @@ import org.apache.flink.table.types.utils.TypeConversions;
 
 import javax.annotation.Nullable;
 
+import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
 
 import static 
org.apache.flink.table.types.logical.LogicalTypeFamily.APPROXIMATE_NUMERIC;
 import static 
org.apache.flink.table.types.logical.LogicalTypeFamily.EXACT_NUMERIC;
@@ -219,6 +222,35 @@ final class StrategyUtils {
         return Optional.empty();
     }
 
+    /**
+     * Validates that the argument at {@code regexArgPos} is a literal that 
compiles as a Java
+     * regex. Non-literal or null-literal regex arguments are deferred to 
runtime.
+     *
+     * @return {@link Optional#empty()} when the argument is not a literal, is 
a {@code NULL}
+     *     literal, or compiles cleanly; the result of {@link 
CallContext#fail} otherwise.
+     */
+    static Optional<List<DataType>> validateLiteralPattern(
+            final CallContext callContext, final int regexArgPos, final 
boolean throwOnFailure) {
+        if (!callContext.isArgumentLiteral(regexArgPos)
+                || callContext.isArgumentNull(regexArgPos)) {
+            return Optional.empty();
+        }
+        final Optional<String> pattern = 
callContext.getArgumentValue(regexArgPos, String.class);
+        if (pattern.isEmpty()) {
+            return Optional.empty();
+        }
+        try {
+            Pattern.compile(pattern.get());
+            return Optional.empty();
+        } catch (PatternSyntaxException e) {
+            return callContext.fail(
+                    throwOnFailure,
+                    "Invalid regular expression for %s: %s",
+                    callContext.getName(),
+                    e.getMessage());
+        }
+    }
+
     private StrategyUtils() {
         // no instantiation
     }
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/RegexpExtractInputTypeStrategyTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/RegexpExtractInputTypeStrategyTest.java
index 8e92023f881..7e514eeb390 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/RegexpExtractInputTypeStrategyTest.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/RegexpExtractInputTypeStrategyTest.java
@@ -55,6 +55,6 @@ class RegexpExtractInputTypeStrategyTest extends 
InputTypeStrategiesTestBase {
                 TestSpec.forStrategy("Invalid literal regex fails at plan 
time", REGEXP_EXTRACT)
                         .calledWithArgumentTypes(DataTypes.STRING(), 
DataTypes.STRING())
                         .calledWithLiteralAt(1, "(")
-                        .expectErrorMessage("Invalid regular expression for 
REGEXP_EXTRACT:"));
+                        .expectErrorMessage("Invalid regular expression for"));
     }
 }
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/RegexpExtractInputTypeStrategyTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/RegexpReplaceInputTypeStrategyTest.java
similarity index 67%
copy from 
flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/RegexpExtractInputTypeStrategyTest.java
copy to 
flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/RegexpReplaceInputTypeStrategyTest.java
index 8e92023f881..6b8c219f96b 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/RegexpExtractInputTypeStrategyTest.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/RegexpReplaceInputTypeStrategyTest.java
@@ -23,38 +23,42 @@ import 
org.apache.flink.table.types.inference.InputTypeStrategiesTestBase;
 
 import java.util.stream.Stream;
 
-import static 
org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.REGEXP_EXTRACT;
+import static 
org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.REGEXP_REPLACE;
 
-/** Tests for {@link RegexpExtractInputTypeStrategy}. */
-class RegexpExtractInputTypeStrategyTest extends InputTypeStrategiesTestBase {
+/** Tests for {@link RegexpReplaceInputTypeStrategy}. */
+class RegexpReplaceInputTypeStrategyTest extends InputTypeStrategiesTestBase {
 
     @Override
     protected Stream<TestSpec> testData() {
         return Stream.of(
                 // Non-literal regex skips the plan-time compile check and is 
deferred to runtime.
-                TestSpec.forStrategy("Non-literal regex defers compile to 
runtime", REGEXP_EXTRACT)
-                        .calledWithArgumentTypes(DataTypes.STRING(), 
DataTypes.STRING())
-                        .expectArgumentTypes(DataTypes.STRING(), 
DataTypes.STRING()),
+                TestSpec.forStrategy("Non-literal regex defers compile to 
runtime", REGEXP_REPLACE)
+                        .calledWithArgumentTypes(
+                                DataTypes.STRING(), DataTypes.STRING(), 
DataTypes.STRING())
+                        .expectArgumentTypes(
+                                DataTypes.STRING(), DataTypes.STRING(), 
DataTypes.STRING()),
 
                 // Valid literal regex compiles cleanly at plan time.
-                TestSpec.forStrategy("Valid literal regex compiles", 
REGEXP_EXTRACT)
+                TestSpec.forStrategy("Valid literal regex compiles", 
REGEXP_REPLACE)
                         .calledWithArgumentTypes(
-                                DataTypes.STRING(), DataTypes.STRING(), 
DataTypes.INT())
+                                DataTypes.STRING(), DataTypes.STRING(), 
DataTypes.STRING())
                         .calledWithLiteralAt(1, "foo(.*?)bar")
                         .expectArgumentTypes(
-                                DataTypes.STRING(), DataTypes.STRING(), 
DataTypes.INT()),
+                                DataTypes.STRING(), DataTypes.STRING(), 
DataTypes.STRING()),
 
                 // Null literal regex short-circuits the plan-time check; 
runtime returns null.
-                TestSpec.forStrategy("Null regex literal is deferred", 
REGEXP_EXTRACT)
-                        .calledWithArgumentTypes(DataTypes.STRING(), 
DataTypes.STRING())
+                TestSpec.forStrategy("Null regex literal is deferred", 
REGEXP_REPLACE)
+                        .calledWithArgumentTypes(
+                                DataTypes.STRING(), DataTypes.STRING(), 
DataTypes.STRING())
                         .calledWithLiteralAt(1, null)
-                        .expectArgumentTypes(DataTypes.STRING(), 
DataTypes.STRING()),
+                        .expectArgumentTypes(
+                                DataTypes.STRING(), DataTypes.STRING(), 
DataTypes.STRING()),
 
-                // Invalid literal regex surfaces as a ValidationException at 
plan time
-                // instead of producing one log line per record at runtime.
-                TestSpec.forStrategy("Invalid literal regex fails at plan 
time", REGEXP_EXTRACT)
-                        .calledWithArgumentTypes(DataTypes.STRING(), 
DataTypes.STRING())
+                // Invalid literal regex surfaces as a ValidationException at 
plan time.
+                TestSpec.forStrategy("Invalid literal regex fails at plan 
time", REGEXP_REPLACE)
+                        .calledWithArgumentTypes(
+                                DataTypes.STRING(), DataTypes.STRING(), 
DataTypes.STRING())
                         .calledWithLiteralAt(1, "(")
-                        .expectErrorMessage("Invalid regular expression for 
REGEXP_EXTRACT:"));
+                        .expectErrorMessage("Invalid regular expression for"));
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/RegexpFunctionsITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/RegexpFunctionsITCase.java
index 85c5ef49d61..e62010bd0c1 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/RegexpFunctionsITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/RegexpFunctionsITCase.java
@@ -375,7 +375,18 @@ class RegexpFunctionsITCase extends 
BuiltInFunctionTestBase {
                                 $("f1").regexpReplace(concat("(", ""), "X"),
                                 "REGEXP_REPLACE(f1, '(' || '', 'X')",
                                 null,
-                                DataTypes.STRING().nullable()));
+                                DataTypes.STRING().nullable()),
+                TestSetSpec.forFunction(
+                                BuiltInFunctionDefinitions.REGEXP_REPLACE,
+                                "Invalid literal regex fails at plan time")
+                        .onFieldsWithData("foobar")
+                        .andDataTypes(DataTypes.STRING())
+                        .testTableApiValidationError(
+                                $("f0").regexpReplace("(", "X"),
+                                "Invalid regular expression for 
REGEXP_REPLACE:")
+                        .testSqlValidationError(
+                                "REGEXP_REPLACE(f0, '(', 'X')",
+                                "Invalid regular expression for 
REGEXP_REPLACE:"));
     }
 
     private Stream<TestSetSpec> regexpSubstrTestCases() {

Reply via email to