This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new aa079c5bab9 [FLINK-39649][table] `REGEXP_EXTRACT` plan-time validation 
and hot-path log cleanup
aa079c5bab9 is described below

commit aa079c5bab96ef96e92d5b70afa6b4e052ef0e78
Author: Ramin Gharib <[email protected]>
AuthorDate: Tue May 12 18:23:44 2026 +0200

    [FLINK-39649][table] `REGEXP_EXTRACT` plan-time validation and hot-path log 
cleanup
---
 .../pyflink/table/tests/test_expression.py         |   4 +-
 .../functions/BuiltInFunctionDefinitions.java      |  14 +--
 .../strategies/RegexpExtractInputTypeStrategy.java | 117 +++++++++++++++++++++
 .../strategies/SpecificInputTypeStrategies.java    |   3 +
 .../RegexpExtractInputTypeStrategyTest.java        |  60 +++++++++++
 .../expressions/converter/DirectConvertRule.java   |   2 -
 .../functions/sql/FlinkSqlOperatorTable.java       |  14 ---
 .../table/planner/codegen/ExprCodeGenerator.scala  |   3 +
 .../planner/codegen/calls/StringCallGen.scala      |   2 -
 .../planner/functions/RegexpFunctionsITCase.java   |  38 +++++--
 .../table/runtime/functions/SqlFunctionUtils.java  |  22 ++--
 11 files changed, 229 insertions(+), 50 deletions(-)

diff --git a/flink-python/pyflink/table/tests/test_expression.py 
b/flink-python/pyflink/table/tests/test_expression.py
index 543566c1e8c..47486894fb2 100644
--- a/flink-python/pyflink/table/tests/test_expression.py
+++ b/flink-python/pyflink/table/tests/test_expression.py
@@ -195,8 +195,8 @@ class PyFlinkBatchExpressionTests(PyFlinkTestCase):
         # regexp functions
         self.assertEqual("regexp(a, b)", str(expr1.regexp(expr2)))
         self.assertEqual("REGEXP_COUNT(a, b)", str(expr1.regexp_count(expr2)))
-        self.assertEqual('regexpExtract(a, b)', 
str(expr1.regexp_extract(expr2)))
-        self.assertEqual('regexpExtract(a, b, 3)', 
str(expr1.regexp_extract(expr2, 3)))
+        self.assertEqual('REGEXP_EXTRACT(a, b)', 
str(expr1.regexp_extract(expr2)))
+        self.assertEqual('REGEXP_EXTRACT(a, b, 3)', 
str(expr1.regexp_extract(expr2, 3)))
         self.assertEqual('REGEXP_EXTRACT_ALL(a, b)', 
str(expr1.regexp_extract_all(expr2)))
         self.assertEqual('REGEXP_EXTRACT_ALL(a, b, 3)', 
str(expr1.regexp_extract_all(expr2, 3)))
         self.assertEqual("regexpReplace(a, b, 'abc')", 
str(expr1.regexp_replace(expr2, 'abc')))
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index 1085afa48b2..6d17bf880e8 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -1387,19 +1387,11 @@ public final class BuiltInFunctionDefinitions {
 
     public static final BuiltInFunctionDefinition REGEXP_EXTRACT =
             BuiltInFunctionDefinition.newBuilder()
-                    .name("regexpExtract")
-                    .sqlName("REGEXP_EXTRACT")
+                    .name("REGEXP_EXTRACT")
                     .kind(SCALAR)
-                    .inputTypeStrategy(
-                            or(
-                                    sequence(
-                                            
logical(LogicalTypeFamily.CHARACTER_STRING),
-                                            
logical(LogicalTypeFamily.CHARACTER_STRING)),
-                                    sequence(
-                                            
logical(LogicalTypeFamily.CHARACTER_STRING),
-                                            
logical(LogicalTypeFamily.CHARACTER_STRING),
-                                            logical(LogicalTypeRoot.INTEGER))))
+                    
.inputTypeStrategy(SpecificInputTypeStrategies.REGEXP_EXTRACT)
                     
.outputTypeStrategy(explicit(DataTypes.STRING().nullable()))
+                    .runtimeProvided()
                     .build();
 
     public static final BuiltInFunctionDefinition REGEXP_EXTRACT_ALL =
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/RegexpExtractInputTypeStrategy.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/RegexpExtractInputTypeStrategy.java
new file mode 100644
index 00000000000..e95620105e7
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/RegexpExtractInputTypeStrategy.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.ArgumentCount;
+import org.apache.flink.table.types.inference.ArgumentTypeStrategy;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.ConstantArgumentCount;
+import org.apache.flink.table.types.inference.InputTypeStrategy;
+import org.apache.flink.table.types.inference.Signature;
+import org.apache.flink.table.types.inference.Signature.Argument;
+import org.apache.flink.table.types.logical.LogicalTypeFamily;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+import static 
org.apache.flink.table.types.inference.InputTypeStrategies.logical;
+
+/**
+ * Input type strategy for {@link BuiltInFunctionDefinitions#REGEXP_EXTRACT}. 
Validates literal
+ * regex patterns at planning time.
+ */
+@Internal
+public class RegexpExtractInputTypeStrategy implements InputTypeStrategy {
+
+    private static final int ARG_STR = 0;
+    private static final int ARG_REGEX = 1;
+    private static final int ARG_EXTRACT_INDEX = 2;
+
+    private static final ArgumentTypeStrategy STRING_ARG =
+            logical(LogicalTypeFamily.CHARACTER_STRING);
+    private static final ArgumentTypeStrategy INT_ARG = 
logical(LogicalTypeRoot.INTEGER);
+
+    @Override
+    public ArgumentCount getArgumentCount() {
+        return ConstantArgumentCount.between(2, 3);
+    }
+
+    @Override
+    public Optional<List<DataType>> inferInputTypes(
+            final CallContext callContext, final boolean throwOnFailure) {
+        if (STRING_ARG.inferArgumentType(callContext, ARG_STR, 
throwOnFailure).isEmpty()) {
+            return Optional.empty();
+        }
+        if (STRING_ARG.inferArgumentType(callContext, ARG_REGEX, 
throwOnFailure).isEmpty()) {
+            return Optional.empty();
+        }
+        if (callContext.getArgumentDataTypes().size() > ARG_EXTRACT_INDEX
+                && INT_ARG.inferArgumentType(callContext, ARG_EXTRACT_INDEX, 
throwOnFailure)
+                        .isEmpty()) {
+            return Optional.empty();
+        }
+
+        final Optional<List<DataType>> patternError =
+                validateLiteralPattern(callContext, throwOnFailure);
+        if (patternError.isPresent()) {
+            return patternError;
+        }
+
+        return Optional.of(callContext.getArgumentDataTypes());
+    }
+
+    private static Optional<List<DataType>> validateLiteralPattern(
+            final CallContext callContext, final boolean throwOnFailure) {
+        if (!callContext.isArgumentLiteral(ARG_REGEX) || 
callContext.isArgumentNull(ARG_REGEX)) {
+            return Optional.empty();
+        }
+        final Optional<String> pattern = 
callContext.getArgumentValue(ARG_REGEX, String.class);
+        if (pattern.isEmpty()) {
+            return Optional.empty();
+        }
+        try {
+            Pattern.compile(pattern.get());
+            return Optional.empty();
+        } catch (PatternSyntaxException e) {
+            return callContext.fail(
+                    throwOnFailure,
+                    "Invalid regular expression for REGEXP_EXTRACT: %s",
+                    e.getMessage());
+        }
+    }
+
+    @Override
+    public List<Signature> getExpectedSignatures(FunctionDefinition 
definition) {
+        return List.of(
+                Signature.of(
+                        Argument.ofGroup("str", 
LogicalTypeFamily.CHARACTER_STRING),
+                        Argument.ofGroup("regex", 
LogicalTypeFamily.CHARACTER_STRING)),
+                Signature.of(
+                        Argument.ofGroup("str", 
LogicalTypeFamily.CHARACTER_STRING),
+                        Argument.ofGroup("regex", 
LogicalTypeFamily.CHARACTER_STRING),
+                        Argument.ofGroup("extractIndex", 
LogicalTypeRoot.INTEGER)));
+    }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
index b4cd96e0349..81d689a550c 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java
@@ -194,6 +194,9 @@ public final class SpecificInputTypeStrategies {
     /** Type strategy for {@link BuiltInFunctionDefinitions#TO_TIMESTAMP_LTZ}. 
*/
     public static final InputTypeStrategy TO_TIMESTAMP_LTZ = new 
ToTimestampLtzInputTypeStrategy();
 
+    /** Type strategy for {@link BuiltInFunctionDefinitions#REGEXP_EXTRACT}. */
+    public static final InputTypeStrategy REGEXP_EXTRACT = new 
RegexpExtractInputTypeStrategy();
+
     private SpecificInputTypeStrategies() {
         // no instantiation
     }
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/RegexpExtractInputTypeStrategyTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/RegexpExtractInputTypeStrategyTest.java
new file mode 100644
index 00000000000..8e92023f881
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/RegexpExtractInputTypeStrategyTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.inference.InputTypeStrategiesTestBase;
+
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.REGEXP_EXTRACT;
+
+/** Tests for {@link RegexpExtractInputTypeStrategy}. */
+class RegexpExtractInputTypeStrategyTest extends InputTypeStrategiesTestBase {
+
+    @Override
+    protected Stream<TestSpec> testData() {
+        return Stream.of(
+                // Non-literal regex skips the plan-time compile check and is 
deferred to runtime.
+                TestSpec.forStrategy("Non-literal regex defers compile to 
runtime", REGEXP_EXTRACT)
+                        .calledWithArgumentTypes(DataTypes.STRING(), 
DataTypes.STRING())
+                        .expectArgumentTypes(DataTypes.STRING(), 
DataTypes.STRING()),
+
+                // Valid literal regex compiles cleanly at plan time.
+                TestSpec.forStrategy("Valid literal regex compiles", 
REGEXP_EXTRACT)
+                        .calledWithArgumentTypes(
+                                DataTypes.STRING(), DataTypes.STRING(), 
DataTypes.INT())
+                        .calledWithLiteralAt(1, "foo(.*?)bar")
+                        .expectArgumentTypes(
+                                DataTypes.STRING(), DataTypes.STRING(), 
DataTypes.INT()),
+
+                // Null literal regex short-circuits the plan-time check; 
runtime returns null.
+                TestSpec.forStrategy("Null regex literal is deferred", 
REGEXP_EXTRACT)
+                        .calledWithArgumentTypes(DataTypes.STRING(), 
DataTypes.STRING())
+                        .calledWithLiteralAt(1, null)
+                        .expectArgumentTypes(DataTypes.STRING(), 
DataTypes.STRING()),
+
+                // Invalid literal regex surfaces as a ValidationException at 
plan time
+                // instead of producing one log line per record at runtime.
+                TestSpec.forStrategy("Invalid literal regex fails at plan 
time", REGEXP_EXTRACT)
+                        .calledWithArgumentTypes(DataTypes.STRING(), 
DataTypes.STRING())
+                        .calledWithLiteralAt(1, "(")
+                        .expectErrorMessage("Invalid regular expression for 
REGEXP_EXTRACT:"));
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java
index 007bcb2a8de..c8ee1cede21 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java
@@ -166,8 +166,6 @@ public class DirectConvertRule implements 
CallExpressionConvertRule {
                 BuiltInFunctionDefinitions.LPAD, FlinkSqlOperatorTable.LPAD);
         definitionSqlOperatorHashMap.put(
                 BuiltInFunctionDefinitions.RPAD, FlinkSqlOperatorTable.RPAD);
-        definitionSqlOperatorHashMap.put(
-                BuiltInFunctionDefinitions.REGEXP_EXTRACT, 
FlinkSqlOperatorTable.REGEXP_EXTRACT);
         definitionSqlOperatorHashMap.put(
                 BuiltInFunctionDefinitions.FROM_BASE64, 
FlinkSqlOperatorTable.FROM_BASE64);
         definitionSqlOperatorHashMap.put(
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
index 817284846d1..55dbcfa22cd 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
@@ -508,20 +508,6 @@ public class FlinkSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
                             SqlTypeFamily.CHARACTER),
                     SqlFunctionCategory.STRING);
 
-    public static final SqlFunction REGEXP_EXTRACT =
-            new SqlFunction(
-                    "REGEXP_EXTRACT",
-                    SqlKind.OTHER_FUNCTION,
-                    VARCHAR_FORCE_NULLABLE,
-                    null,
-                    OperandTypes.or(
-                            OperandTypes.family(
-                                    SqlTypeFamily.CHARACTER,
-                                    SqlTypeFamily.CHARACTER,
-                                    SqlTypeFamily.INTEGER),
-                            OperandTypes.family(SqlTypeFamily.CHARACTER, 
SqlTypeFamily.CHARACTER)),
-                    SqlFunctionCategory.STRING);
-
     public static final SqlFunction HASH_CODE =
             new SqlFunction(
                     "HASH_CODE",
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
index 1de3ca9bee8..c6abee0315a 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
@@ -941,6 +941,9 @@ class ExprCodeGenerator(
           case BuiltInFunctionDefinitions.JSON =>
             new JsonCallGen().generate(ctx, operands, 
FlinkTypeFactory.toLogicalType(call.getType))
 
+          case BuiltInFunctionDefinitions.REGEXP_EXTRACT =>
+            StringCallGen.generateRegexpExtract(ctx, operands, resultType)
+
           case _ =>
             new BridgingSqlFunctionCallGen(call, rexProgram).generate(ctx, 
operands, resultType)
         }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
index 71ef476424a..6cc0dfad092 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
@@ -79,8 +79,6 @@ object StringCallGen {
       case NOT_SIMILAR_TO =>
         generateNot(ctx, generateSimilarTo(ctx, operands, returnType), 
returnType)
 
-      case REGEXP_EXTRACT => generateRegexpExtract(ctx, operands, returnType)
-
       case REGEXP_REPLACE => generateRegexpReplace(ctx, operands, returnType)
 
       case IS_DECIMAL => generateIsDecimal(ctx, operands, returnType)
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/RegexpFunctionsITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/RegexpFunctionsITCase.java
index c5a63826c7e..0a39f11028e 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/RegexpFunctionsITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/RegexpFunctionsITCase.java
@@ -24,7 +24,7 @@ import 
org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 import java.util.stream.Stream;
 
 import static org.apache.flink.table.api.Expressions.$;
-import static org.apache.flink.table.api.Expressions.call;
+import static org.apache.flink.table.api.Expressions.concat;
 import static org.apache.flink.table.api.Expressions.lit;
 
 /** Test Regexp functions correct behaviour. */
@@ -121,17 +121,43 @@ class RegexpFunctionsITCase extends 
BuiltInFunctionTestBase {
         return Stream.of(
                 TestSetSpec.forFunction(
                                 BuiltInFunctionDefinitions.REGEXP_EXTRACT, 
"Check return type")
-                        .onFieldsWithData("22", "ABC")
+                        .onFieldsWithData("22", "ABC", "(")
                         .testResult(
-                                call("regexpExtract", $("f0"), "[A-Z]+"),
-                                "REGEXP_EXTRACT(f0,'[A-Z]+')",
+                                $("f0").regexpExtract("[A-Z]+"),
+                                "REGEXP_EXTRACT(f0, '[A-Z]+')",
                                 null,
                                 DataTypes.STRING().nullable())
                         .testResult(
-                                call("regexpExtract", $("f1"), "[A-Z]+"),
+                                $("f1").regexpExtract("[A-Z]+"),
                                 "REGEXP_EXTRACT(f1, '[A-Z]+')",
                                 "ABC",
-                                DataTypes.STRING().nullable()));
+                                DataTypes.STRING().nullable())
+                        .testResult(
+                                $("f1").regexpExtract($("f2")),
+                                "REGEXP_EXTRACT(f1, f2)",
+                                null,
+                                DataTypes.STRING().nullable())
+                        .testResult(
+                                $("f1").regexpExtract(concat("[A-", "Z]+")),
+                                "REGEXP_EXTRACT(f1, '[A-' || 'Z]+')",
+                                "ABC",
+                                DataTypes.STRING().nullable())
+                        .testResult(
+                                $("f1").regexpExtract(concat("(", "")),
+                                "REGEXP_EXTRACT(f1, '(' || '')",
+                                null,
+                                DataTypes.STRING().nullable()),
+                TestSetSpec.forFunction(
+                                BuiltInFunctionDefinitions.REGEXP_EXTRACT,
+                                "Invalid literal regex fails at plan time")
+                        .onFieldsWithData("ABC")
+                        .andDataTypes(DataTypes.STRING())
+                        .testTableApiValidationError(
+                                $("f0").regexpExtract("("),
+                                "Invalid regular expression for 
REGEXP_EXTRACT:")
+                        .testSqlValidationError(
+                                "REGEXP_EXTRACT(f0, '(')",
+                                "Invalid regular expression for 
REGEXP_EXTRACT:"));
     }
 
     private Stream<TestSetSpec> regexpExtractAllTestCases() {
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java
index 0fdd5213613..2f90133f744 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlFunctionUtils.java
@@ -46,7 +46,6 @@ import java.util.Base64;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
-import java.util.regex.MatchResult;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.regex.PatternSyntaxException;
@@ -444,26 +443,23 @@ public class SqlFunctionUtils {
 
     /**
      * Returns a string extracted with a specified regular expression and a 
regex match group index.
+     * Literal regexes are validated at planning time by the input type 
strategy.
      */
     public static String regexpExtract(String str, String regex, int 
extractIndex) {
-        if (str == null || regex == null) {
+        if (str == null || regex == null || extractIndex < 0) {
             return null;
         }
-
         try {
-            Matcher m = Pattern.compile(regex).matcher(str);
+            final Matcher m = REGEXP_PATTERN_CACHE.get(regex).matcher(str);
+            if (m.groupCount() < extractIndex) {
+                return null;
+            }
             if (m.find()) {
-                MatchResult mr = m.toMatchResult();
-                return mr.group(extractIndex);
+                return m.group(extractIndex);
             }
-        } catch (Exception e) {
-            LOG.error(
-                    String.format(
-                            "Exception in regexpExtract('%s', '%s', '%d')",
-                            str, regex, extractIndex),
-                    e);
+        } catch (PatternSyntaxException e) {
+            // non-literal invalid regex returns null.
         }
-
         return null;
     }
 

Reply via email to