Copilot commented on code in PR #4227:
URL: https://github.com/apache/flink-cdc/pull/4227#discussion_r2703012491


##########
flink-cdc-runtime/src/main/java/org/apache/calcite/sql/type/SqlTypeName.java:
##########
@@ -0,0 +1,1059 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.calcite.sql.type;
+
+import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableList;
+import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
+import org.apache.flink.shaded.guava31.com.google.common.collect.Iterables;
+import org.apache.flink.shaded.guava31.com.google.common.collect.Sets;
+
+import org.apache.calcite.avatica.util.TimeUnit;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.DateString;
+import org.apache.calcite.util.TimeString;
+import org.apache.calcite.util.TimestampString;
+import org.apache.calcite.util.Util;
+
+import javax.annotation.Nullable;
+
+import java.math.BigDecimal;
+import java.sql.Types;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Enumeration of the type names which can be used to construct a SQL type. 
Rationale for this
+ * class's existence (instead of just using the standard java.sql.Type 
ordinals):
+ *
+ * <ul>
+ *   <li>{@link Types} does not include all SQL2003 data-types;
+ *   <li>SqlTypeName provides a type-safe enumeration;
+ *   <li>SqlTypeName provides a place to hang extra information such as 
whether the type carries
+ *       precision and scale.
+ * </ul>
+ *
+ * <p>This class was copied over from Calcite to support variant 
type(CALCITE-4918). When upgrading
+ * to Calcite 1.39.0 version, please remove the entire class.
+ *
+ * <p>changList: 1. Add variant type: Line 167~171, Line 236.

Review Comment:
   The word "changList" should be spelled "changeList" in the documentation 
comment.
   ```suggestion
    * <p>changeList: 1. Add variant type: Line 167~171, Line 236.
   ```



##########
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/JaninoCompilerTest.java:
##########
@@ -126,6 +128,42 @@ void testBuildInFunction() throws 
InvocationTargetException {
                         Double.class);
         Object evaluate = expressionEvaluator.evaluate(params.toArray());
         Assertions.assertThat(evaluate).isEqualTo(3.0);
+
+        String jsonStr =
+                
"{\"name\":\"张三\",\"age\":30,\"is_active\":true,\"email\":\"[email protected]\",\"hobbies\":[\"reading\",\"coding\",\"traveling\"],\"address\":{\"street\":\"MainSt\",\"city\":\"Beijing\",\"zip\":\"100000\"}}";
+        expressionEvaluator =
+                JaninoCompiler.compileExpression(
+                        
JaninoCompiler.loadSystemFunction("parseJson(testJsonStr)"),
+                        List.of("testJsonStr"),
+                        List.of(String.class),
+                        Variant.class);
+        evaluate = expressionEvaluator.evaluate(new Object[] {jsonStr});
+        Assertions.assertThat(evaluate)
+                .isEqualTo(BinaryVariantInternalBuilder.parseJson(jsonStr, 
false));
+
+        final String invalidJsonStr = "invalidJson";
+        Assertions.assertThatThrownBy(
+                        () -> {
+                            JaninoCompiler.compileExpression(
+                                            JaninoCompiler.loadSystemFunction(
+                                                    "parseJson(testJsonStr)"),
+                                            List.of("testJsonStr"),
+                                            List.of(String.class),
+                                            Variant.class)
+                                    .evaluate(new Object[] {invalidJsonStr});
+                        })
+                .cause()
+                .isExactlyInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("Failed to parse json string");
+
+        expressionEvaluator =
+                JaninoCompiler.compileExpression(
+                        
JaninoCompiler.loadSystemFunction("tryParseJson(testJsonStr)"),
+                        List.of("testJsonStr"),
+                        List.of(String.class),
+                        Variant.class);
+        evaluate = expressionEvaluator.evaluate(new Object[] {invalidJsonStr});
+        Assertions.assertThat(evaluate).isEqualTo(null);
     }

Review Comment:
   The test coverage for parseJson and tryParseJson is missing test cases for 
empty strings. Since both functions return null for empty strings, add test 
cases to verify this behavior explicitly, especially to ensure it's intentional 
and documented.



##########
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/JaninoCompilerTest.java:
##########
@@ -126,6 +128,42 @@ void testBuildInFunction() throws 
InvocationTargetException {
                         Double.class);
         Object evaluate = expressionEvaluator.evaluate(params.toArray());
         Assertions.assertThat(evaluate).isEqualTo(3.0);
+
+        String jsonStr =
+                
"{\"name\":\"张三\",\"age\":30,\"is_active\":true,\"email\":\"[email protected]\",\"hobbies\":[\"reading\",\"coding\",\"traveling\"],\"address\":{\"street\":\"MainSt\",\"city\":\"Beijing\",\"zip\":\"100000\"}}";
+        expressionEvaluator =
+                JaninoCompiler.compileExpression(
+                        
JaninoCompiler.loadSystemFunction("parseJson(testJsonStr)"),
+                        List.of("testJsonStr"),
+                        List.of(String.class),
+                        Variant.class);
+        evaluate = expressionEvaluator.evaluate(new Object[] {jsonStr});
+        Assertions.assertThat(evaluate)
+                .isEqualTo(BinaryVariantInternalBuilder.parseJson(jsonStr, 
false));
+
+        final String invalidJsonStr = "invalidJson";
+        Assertions.assertThatThrownBy(
+                        () -> {
+                            JaninoCompiler.compileExpression(
+                                            JaninoCompiler.loadSystemFunction(
+                                                    "parseJson(testJsonStr)"),
+                                            List.of("testJsonStr"),
+                                            List.of(String.class),
+                                            Variant.class)
+                                    .evaluate(new Object[] {invalidJsonStr});
+                        })
+                .cause()
+                .isExactlyInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("Failed to parse json string");
+
+        expressionEvaluator =
+                JaninoCompiler.compileExpression(
+                        
JaninoCompiler.loadSystemFunction("tryParseJson(testJsonStr)"),
+                        List.of("testJsonStr"),
+                        List.of(String.class),
+                        Variant.class);
+        evaluate = expressionEvaluator.evaluate(new Object[] {invalidJsonStr});
+        Assertions.assertThat(evaluate).isEqualTo(null);
     }

Review Comment:
   Test coverage is missing for the two-parameter versions of parseJson and 
tryParseJson that accept the allowDuplicateKeys parameter. Add tests to verify 
the behavior when duplicate keys are encountered with this parameter set to 
both true and false.



##########
flink-cdc-runtime/src/main/java/org/apache/calcite/sql/type/SqlTypeFamily.java:
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.calcite.sql.type;
+
+import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableList;
+import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
+
+import org.apache.calcite.avatica.util.TimeUnit;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeFamily;
+import org.apache.calcite.sql.SqlIntervalQualifier;
+import org.apache.calcite.sql.SqlWindow;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+import javax.annotation.Nullable;
+
+import java.sql.Types;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * SqlTypeFamily provides SQL type categorization.
+ *
+ * <p>The <em>primary</em> family categorization is a complete disjoint 
partitioning of SQL types
+ * into families, where two types are members of the same primary family iff 
instances of the two
+ * types can be the operands of an SQL equality predicate such as <code>WHERE 
v1 = v2</code>.
+ * Primary families are returned by RelDataType.getFamily().
+ *
+ * <p>There is also a <em>secondary</em> family categorization which overlaps 
with the primary
+ * categorization. It is used in type strategies for more specific or more 
general categorization
+ * than the primary families. Secondary families are never returned by 
RelDataType.getFamily().
+ *
+ * <p>This class was copied over from Calcite to support variant 
type(CALCITE-4918). When upgrading
+ * to Calcite 1.39.0 version, please remove the entire class.
+ *
+ * <p>changList: 1. Add variant type: Line 84, Line 120, Line 218~219.
+ */
+public enum SqlTypeFamily implements RelDataTypeFamily {
+    // Primary families.
+    CHARACTER,
+    BINARY,
+    NUMERIC,
+    DATE,
+    TIME,
+    TIMESTAMP,
+    BOOLEAN,
+    INTERVAL_YEAR_MONTH,
+    INTERVAL_DAY_TIME,
+
+    // Secondary families.
+
+    STRING,
+    APPROXIMATE_NUMERIC,
+    EXACT_NUMERIC,
+    DECIMAL,
+    INTEGER,
+    DATETIME,
+    DATETIME_INTERVAL,
+    MULTISET,
+    ARRAY,
+    MAP,
+    NULL,
+    ANY,
+    CURSOR,
+    COLUMN_LIST,
+    GEO,
+    VARIANT,
+    /** Like ANY, but do not even validate the operand. It may not be an 
expression. */
+    IGNORE;
+
+    private static final Map<Integer, SqlTypeFamily> JDBC_TYPE_TO_FAMILY =
+            ImmutableMap.<Integer, SqlTypeFamily>builder()
+                    // Not present:
+                    // SqlTypeName.MULTISET shares Types.ARRAY with 
SqlTypeName.ARRAY;
+                    // SqlTypeName.MAP has no corresponding JDBC type
+                    // SqlTypeName.COLUMN_LIST has no corresponding JDBC type
+                    .put(Types.BIT, NUMERIC)
+                    .put(Types.TINYINT, NUMERIC)
+                    .put(Types.SMALLINT, NUMERIC)
+                    .put(Types.BIGINT, NUMERIC)
+                    .put(Types.INTEGER, NUMERIC)
+                    .put(Types.NUMERIC, NUMERIC)
+                    .put(Types.DECIMAL, NUMERIC)
+                    .put(Types.FLOAT, NUMERIC)
+                    .put(Types.REAL, NUMERIC)
+                    .put(Types.DOUBLE, NUMERIC)
+                    .put(Types.CHAR, CHARACTER)
+                    .put(Types.VARCHAR, CHARACTER)
+                    .put(Types.LONGVARCHAR, CHARACTER)
+                    .put(Types.CLOB, CHARACTER)
+                    .put(Types.BINARY, BINARY)
+                    .put(Types.VARBINARY, BINARY)
+                    .put(Types.LONGVARBINARY, BINARY)
+                    .put(Types.BLOB, BINARY)
+                    .put(Types.DATE, DATE)
+                    .put(Types.TIME, TIME)
+                    .put(ExtraSqlTypes.TIME_WITH_TIMEZONE, TIME)
+                    .put(Types.TIMESTAMP, TIMESTAMP)
+                    .put(ExtraSqlTypes.TIMESTAMP_WITH_TIMEZONE, TIMESTAMP)
+                    .put(Types.BOOLEAN, BOOLEAN)
+                    .put(ExtraSqlTypes.REF_CURSOR, CURSOR)
+                    .put(Types.ARRAY, ARRAY)
+                    .put(Types.JAVA_OBJECT, VARIANT)
+                    .build();
+
+    /**
+     * Gets the primary family containing a JDBC type.
+     *
+     * @param jdbcType the JDBC type of interest
+     * @return containing family
+     */
+    public static @Nullable SqlTypeFamily getFamilyForJdbcType(int jdbcType) {
+        return JDBC_TYPE_TO_FAMILY.get(jdbcType);
+    }
+
+    /**
+     * For this type family, returns the allow types of the difference between 
two values of this
+     * family.
+     *
+     * <p>Equivalently, given an {@code ORDER BY} expression with one key, 
returns the allowable
+     * type families of the difference between two keys.
+     *
+     * <p>Example 1. For {@code ORDER BY empno}, a NUMERIC, the difference 
between two {@code empno}
+     * values is also NUMERIC.
+     *
+     * <p>Example 2. For {@code ORDER BY hireDate}, a DATE, the difference 
between two {@code
+     * hireDate} values might be an INTERVAL_DAY_TIME or INTERVAL_YEAR_MONTH.
+     *
+     * <p>The result determines whether a {@link SqlWindow} with a {@code 
RANGE} is valid (for
+     * example, {@code OVER (ORDER BY empno RANGE 10} is valid because {@code 
10} is numeric); and
+     * whether a call to {@link 
org.apache.calcite.sql.fun.SqlStdOperatorTable#PERCENTILE_CONT
+     * PERCENTILE_CONT} is valid (for example, {@code PERCENTILE_CONT(0.25)} 
ORDER BY (hireDate)} is
+     * valid because {@code hireDate} values may be interpolated by adding 
values of type {@code
+     * INTERVAL_DAY_TIME}.
+     */
+    public List<SqlTypeFamily> allowableDifferenceTypes() {
+        switch (this) {
+            case NUMERIC:
+                return ImmutableList.of(NUMERIC);
+            case DATE:
+            case TIME:
+            case TIMESTAMP:
+                return ImmutableList.of(INTERVAL_DAY_TIME, 
INTERVAL_YEAR_MONTH);
+            default:
+                return ImmutableList.of();
+        }
+    }
+
+    /** Returns the collection of {@link SqlTypeName}s included in this 
family. */
+    public Collection<SqlTypeName> getTypeNames() {
+        switch (this) {
+            case CHARACTER:
+                return SqlTypeName.CHAR_TYPES;
+            case BINARY:
+                return SqlTypeName.BINARY_TYPES;
+            case NUMERIC:
+                return SqlTypeName.NUMERIC_TYPES;
+            case DECIMAL:
+                return ImmutableList.of(SqlTypeName.DECIMAL);
+            case DATE:
+                return ImmutableList.of(SqlTypeName.DATE);
+            case TIME:
+                return ImmutableList.of(SqlTypeName.TIME, 
SqlTypeName.TIME_WITH_LOCAL_TIME_ZONE);
+            case TIMESTAMP:
+                return ImmutableList.of(
+                        SqlTypeName.TIMESTAMP, 
SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE);
+            case BOOLEAN:
+                return SqlTypeName.BOOLEAN_TYPES;
+            case INTERVAL_YEAR_MONTH:
+                return SqlTypeName.YEAR_INTERVAL_TYPES;
+            case INTERVAL_DAY_TIME:
+                return SqlTypeName.DAY_INTERVAL_TYPES;
+            case STRING:
+                return SqlTypeName.STRING_TYPES;
+            case APPROXIMATE_NUMERIC:
+                return SqlTypeName.APPROX_TYPES;
+            case EXACT_NUMERIC:
+                return SqlTypeName.EXACT_TYPES;
+            case INTEGER:
+                return SqlTypeName.INT_TYPES;
+            case DATETIME:
+                return SqlTypeName.DATETIME_TYPES;
+            case DATETIME_INTERVAL:
+                return SqlTypeName.INTERVAL_TYPES;
+            case GEO:
+                return SqlTypeName.GEOMETRY_TYPES;
+            case MULTISET:
+                return ImmutableList.of(SqlTypeName.MULTISET);
+            case ARRAY:
+                return ImmutableList.of(SqlTypeName.ARRAY);
+            case MAP:
+                return ImmutableList.of(SqlTypeName.MAP);
+            case NULL:
+                return ImmutableList.of(SqlTypeName.NULL);
+            case ANY:
+                return SqlTypeName.ALL_TYPES;
+            case CURSOR:
+                return ImmutableList.of(SqlTypeName.CURSOR);
+            case COLUMN_LIST:
+                return ImmutableList.of(SqlTypeName.COLUMN_LIST);
+            case VARIANT:
+                return ImmutableList.of(SqlTypeName.VARIANT);
+            default:
+                throw new IllegalArgumentException();
+        }
+    }
+
+    /** Return the default {@link RelDataType} that belongs to this family. */
+    public @Nullable RelDataType getDefaultConcreteType(RelDataTypeFactory 
factory) {
+        switch (this) {
+            case CHARACTER:
+                return factory.createSqlType(SqlTypeName.VARCHAR);
+            case BINARY:
+                return factory.createSqlType(SqlTypeName.VARBINARY);
+            case NUMERIC:
+                return SqlTypeUtil.getMaxPrecisionScaleDecimal(factory);
+            case DATE:
+                return factory.createSqlType(SqlTypeName.DATE);
+            case TIME:
+                return factory.createSqlType(SqlTypeName.TIME);
+            case TIMESTAMP:
+                return factory.createSqlType(SqlTypeName.TIMESTAMP);
+            case BOOLEAN:
+                return factory.createSqlType(SqlTypeName.BOOLEAN);
+            case STRING:
+                return factory.createSqlType(SqlTypeName.VARCHAR);
+            case APPROXIMATE_NUMERIC:
+                return factory.createSqlType(SqlTypeName.DOUBLE);
+            case EXACT_NUMERIC:
+                return SqlTypeUtil.getMaxPrecisionScaleDecimal(factory);
+            case INTEGER:
+                return factory.createSqlType(SqlTypeName.BIGINT);
+            case DECIMAL:
+                return factory.createSqlType(SqlTypeName.DECIMAL);
+            case DATETIME:
+                return factory.createSqlType(SqlTypeName.TIMESTAMP);
+            case INTERVAL_DAY_TIME:
+                return factory.createSqlIntervalType(
+                        new SqlIntervalQualifier(TimeUnit.DAY, 
TimeUnit.SECOND, SqlParserPos.ZERO));
+            case INTERVAL_YEAR_MONTH:
+                return factory.createSqlIntervalType(
+                        new SqlIntervalQualifier(TimeUnit.YEAR, 
TimeUnit.MONTH, SqlParserPos.ZERO));
+            case GEO:
+                return factory.createSqlType(SqlTypeName.GEOMETRY);
+            case MULTISET:
+                return 
factory.createMultisetType(factory.createSqlType(SqlTypeName.ANY), -1);
+            case ARRAY:
+                return 
factory.createArrayType(factory.createSqlType(SqlTypeName.ANY), -1);
+            case MAP:
+                return factory.createMapType(
+                        factory.createSqlType(SqlTypeName.ANY),
+                        factory.createSqlType(SqlTypeName.ANY));
+            case NULL:
+                return factory.createSqlType(SqlTypeName.NULL);
+            case CURSOR:
+                return factory.createSqlType(SqlTypeName.CURSOR);
+            case COLUMN_LIST:
+                return factory.createSqlType(SqlTypeName.COLUMN_LIST);

Review Comment:
   The getDefaultConcreteType method is missing a case for the VARIANT type 
family. When the switch statement reaches the default case, it returns null, 
which may cause issues when VARIANT is used in contexts that require a default 
concrete type. Add a case for VARIANT that returns 
factory.createSqlType(SqlTypeName.VARIANT).
   ```suggestion
                   return factory.createSqlType(SqlTypeName.COLUMN_LIST);
               case VARIANT:
                   return factory.createSqlType(SqlTypeName.VARIANT);
   ```



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java:
##########
@@ -1096,4 +1098,37 @@ public static boolean lessThanOrEqual(Object lhs, Object 
rhs) {
         }
         return universalCompares(lhs, rhs) <= 0;
     }
+
+    public static Variant tryParseJson(String jsonStr) {
+        return tryParseJson(jsonStr, false);
+    }
+
+    public static Variant tryParseJson(String jsonStr, boolean 
allowDuplicateKeys) {
+        if (jsonStr == null || jsonStr.isEmpty()) {
+            return null;
+        }
+
+        try {
+            return BinaryVariantInternalBuilder.parseJson(jsonStr, 
allowDuplicateKeys);
+        } catch (Throwable e) {
+            return null;
+        }
+    }
+
+    public static Variant parseJson(String jsonStr) {
+        return parseJson(jsonStr, false);
+    }
+
+    public static Variant parseJson(String jsonStr, boolean 
allowDuplicateKeys) {
+        if (jsonStr == null || jsonStr.isEmpty()) {
+            return null;

Review Comment:
   Both parseJson and tryParseJson return null for empty strings, which makes 
their behavior identical in this case. This seems inconsistent with the purpose 
of these functions. Consider whether parseJson should throw an exception for 
empty strings (treating it as invalid JSON) while tryParseJson returns null, to 
maintain a clear distinction between the two functions' error handling 
approaches.
   ```suggestion
               throw new IllegalArgumentException("JSON string must not be null 
or empty");
   ```



##########
flink-cdc-runtime/src/main/java/org/apache/calcite/sql/type/SqlTypeFamily.java:
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.calcite.sql.type;
+
+import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableList;
+import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
+
+import org.apache.calcite.avatica.util.TimeUnit;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeFamily;
+import org.apache.calcite.sql.SqlIntervalQualifier;
+import org.apache.calcite.sql.SqlWindow;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+import javax.annotation.Nullable;
+
+import java.sql.Types;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * SqlTypeFamily provides SQL type categorization.
+ *
+ * <p>The <em>primary</em> family categorization is a complete disjoint 
partitioning of SQL types
+ * into families, where two types are members of the same primary family iff 
instances of the two
+ * types can be the operands of an SQL equality predicate such as <code>WHERE 
v1 = v2</code>.
+ * Primary families are returned by RelDataType.getFamily().
+ *
+ * <p>There is also a <em>secondary</em> family categorization which overlaps 
with the primary
+ * categorization. It is used in type strategies for more specific or more 
general categorization
+ * than the primary families. Secondary families are never returned by 
RelDataType.getFamily().
+ *
+ * <p>This class was copied over from Calcite to support variant 
type(CALCITE-4918). When upgrading
+ * to Calcite 1.39.0 version, please remove the entire class.
+ *
+ * <p>changList: 1. Add variant type: Line 84, Line 120, Line 218~219.

Review Comment:
   The word "changList" should be spelled "changeList" in the documentation 
comment.
   ```suggestion
    * <p>changeList: 1. Add variant type: Line 84, Line 120, Line 218~219.
   ```



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java:
##########
@@ -1096,4 +1098,37 @@ public static boolean lessThanOrEqual(Object lhs, Object 
rhs) {
         }
         return universalCompares(lhs, rhs) <= 0;
     }
+
+    public static Variant tryParseJson(String jsonStr) {
+        return tryParseJson(jsonStr, false);
+    }
+
+    public static Variant tryParseJson(String jsonStr, boolean 
allowDuplicateKeys) {
+        if (jsonStr == null || jsonStr.isEmpty()) {
+            return null;
+        }
+
+        try {
+            return BinaryVariantInternalBuilder.parseJson(jsonStr, 
allowDuplicateKeys);
+        } catch (Throwable e) {
+            return null;
+        }
+    }
+
+    public static Variant parseJson(String jsonStr) {
+        return parseJson(jsonStr, false);
+    }
+
+    public static Variant parseJson(String jsonStr, boolean 
allowDuplicateKeys) {
+        if (jsonStr == null || jsonStr.isEmpty()) {
+            return null;
+        }
+
+        try {
+            return BinaryVariantInternalBuilder.parseJson(jsonStr, 
allowDuplicateKeys);
+        } catch (Throwable e) {
+            throw new IllegalArgumentException(
+                    String.format("Failed to parse json string: %s", jsonStr), 
e);
+        }
+    }

Review Comment:
   The parseJson and tryParseJson functions lack documentation comments 
explaining their purpose, parameters, and behavior differences. Add JavaDoc 
comments to document:
   1. The purpose of each function (parseJson throws exceptions on parse 
errors, tryParseJson returns null)
   2. The meaning of the allowDuplicateKeys parameter
   3. The return value (Variant or null)
   4. Examples of when to use each function



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to