dylanhz commented on code in PR #27483:
URL: https://github.com/apache/flink/pull/27483#discussion_r2922004494


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##########
@@ -463,6 +463,43 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
                             
"org.apache.flink.table.runtime.functions.scalar.UrlEncodeFunction")
                     .build();
 
+    /**
+     * Converts an IPv4 address string to its numeric representation. Follows 
MySQL INET_ATON
+     * behavior.
+     *
+     * <p>Supports MySQL-compatible short-form addresses (a.b, a.b.c) and 
treats leading zeros as
+     * decimal.
+     *
+     * <p>Example: INET_ATON('127.0.0.1') returns 2130706433; 
INET_ATON('127.1') returns 2130706433
+     */

Review Comment:
   nit: We don't add comments here usually.



##########
flink-python/pyflink/table/expression.py:
##########
@@ -1447,6 +1447,43 @@ def url_encode(self) -> 'Expression[str]':
         """
         return _unary_op("urlEncode")(self)
 
+    def inet_aton(self) -> 'Expression':
+        """
+        Converts an IPv4 address string to its numeric representation (BIGINT).
+        This function follows MySQL INET_ATON behavior.
+
+        The conversion formula is: A * 256^3 + B * 256^2 + C * 256 + D for an 
IP address A.B.C.D
+
+        Supports MySQL-compatible short-form addresses:
+          - a.b is interpreted as a.0.0.b
+          - a.b.c is interpreted as a.b.0.c
+
+        Examples:
+          - lit('127.0.0.1').inet_aton() returns 2130706433
+          - lit('127.1').inet_aton() returns 2130706433 (short-form)
+          - lit('0.0.0.0').inet_aton() returns 0
+
+        :return: the numeric representation of the IP address, or null if the 
input is
+                 null or invalid
+        """
+        return _unary_op("inetAton")(self)
+
+    def inet_ntoa(self) -> 'Expression[str]':
+        """
+        Converts a numeric IPv4 address representation (BIGINT or INT) back to 
its string format.
+        This function follows MySQL INET_NTOA behavior.

Review Comment:
   Is there any compatibility issue for this func? If not, we can remove this 
line and update the doc.



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/InetAtonFunction.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+
+import javax.annotation.Nullable;
+
+/**
+ * Implementation of {@link BuiltInFunctionDefinitions#INET_ATON}.
+ *
+ * <p>This function converts an IPv4 address string to its numeric 
representation. It follows the
+ * MySQL INET_ATON function behavior, including support for short-form IPv4 
addresses.
+ *
+ * <p>The conversion formula for a standard IP address A.B.C.D is: A * 256^3 + 
B * 256^2 + C * 256 +
+ * D
+ *
+ * <p>MySQL-compatible short-form IPv4 addresses are supported:
+ *
+ * <ul>
+ *   <li>a.b is interpreted as a.0.0.b
+ *   <li>a.b.c is interpreted as a.b.0.c
+ * </ul>
+ *
+ * <p>Leading zeros in octets are parsed as decimal (consistent with MySQL), 
not octal.
+ *
+ * <p>Note: This function only supports IPv4 addresses. IPv6 addresses are not 
supported.
+ *
+ * <p><b>Implementation Note:</b> This implementation does not use utility 
classes such as {@code
+ * com.google.common.net.InetAddresses} or {@code sun.net.util.IPAddressUtil} 
because:
+ *
+ * <ul>
+ *   <li>Guava's {@code InetAddresses.forString()} does not support 
MySQL-compatible short-form IP
+ *       addresses (e.g., "127.1" interpreted as "127.0.0.1")
+ *   <li>Standard IP parsers may interpret leading zeros as octal (e.g., "010" 
as 8), while MySQL
+ *       treats them as decimal (e.g., "010" as 10)
+ *   <li>{@code sun.net.util.IPAddressUtil} is a JDK internal API requiring 
{@code --add-exports},
+ *       which introduces JDK version compatibility issues
+ * </ul>
+ *
+ * <p>Examples:
+ *
+ * <ul>
+ *   <li>INET_ATON('127.0.0.1') returns 2130706433
+ *   <li>INET_ATON('127.1') returns 2130706433 (short-form: 127.0.0.1)
+ *   <li>INET_ATON('127.0.1') returns 2130706433 (short-form: 127.0.0.1)
+ *   <li>INET_ATON('10.0.0.1') returns 167772161
+ *   <li>INET_ATON('0.0.0.0') returns 0
+ * </ul>
+ */
+@Internal
+public class InetAtonFunction extends BuiltInScalarFunction {
+
+    public InetAtonFunction(SpecializedFunction.SpecializedContext context) {
+        super(BuiltInFunctionDefinitions.INET_ATON, context);
+    }
+
+    /**
+     * Converts an IPv4 address string to its numeric representation.
+     *
+     * @param ipAddress the IPv4 address string in dotted-decimal notation 
(supports short-form)
+     * @return the numeric representation of the IP address, or null if input 
is null or invalid
+     */
+    public @Nullable Long eval(@Nullable StringData ipAddress) {
+        if (ipAddress == null) {

Review Comment:
   Use `BinaryStringDataUtil.isEmpty()` to avoid unnecessary conversion.
   
   BTW, it looks like we can implement this function using just 
`BinaryStringData` API without `String`.



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/InetAtonFunction.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+
+import javax.annotation.Nullable;
+
+/**
+ * Implementation of {@link BuiltInFunctionDefinitions#INET_ATON}.
+ *
+ * <p>This function converts an IPv4 address string to its numeric 
representation. It follows the
+ * MySQL INET_ATON function behavior, including support for short-form IPv4 
addresses.
+ *
+ * <p>The conversion formula for a standard IP address A.B.C.D is: A * 256^3 + 
B * 256^2 + C * 256 +
+ * D
+ *
+ * <p>MySQL-compatible short-form IPv4 addresses are supported:
+ *
+ * <ul>
+ *   <li>a.b is interpreted as a.0.0.b
+ *   <li>a.b.c is interpreted as a.b.0.c
+ * </ul>
+ *
+ * <p>Leading zeros in octets are parsed as decimal (consistent with MySQL), 
not octal.
+ *
+ * <p>Note: This function only supports IPv4 addresses. IPv6 addresses are not 
supported.
+ *
+ * <p><b>Implementation Note:</b> This implementation does not use utility 
classes such as {@code
+ * com.google.common.net.InetAddresses} or {@code sun.net.util.IPAddressUtil} 
because:
+ *
+ * <ul>
+ *   <li>Guava's {@code InetAddresses.forString()} does not support 
MySQL-compatible short-form IP
+ *       addresses (e.g., "127.1" interpreted as "127.0.0.1")
+ *   <li>Standard IP parsers may interpret leading zeros as octal (e.g., "010" 
as 8), while MySQL
+ *       treats them as decimal (e.g., "010" as 10)
+ *   <li>{@code sun.net.util.IPAddressUtil} is a JDK internal API requiring 
{@code --add-exports},
+ *       which introduces JDK version compatibility issues
+ * </ul>

Review Comment:
   nit: Would it be better to leave this note in pr comments, not in the code 
base?



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/InetAtonFunction.java:
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+
+import javax.annotation.Nullable;
+
+/**
+ * Implementation of {@link BuiltInFunctionDefinitions#INET_ATON}.
+ *
+ * <p>This function converts an IPv4 address string to its numeric 
representation. It follows the
+ * MySQL INET_ATON function behavior, including support for short-form IPv4 
addresses.
+ *
+ * <p>The conversion formula for a standard IP address A.B.C.D is: A * 256^3 + 
B * 256^2 + C * 256 +
+ * D
+ *
+ * <p>MySQL-compatible short-form IPv4 addresses are supported:
+ *
+ * <ul>
+ *   <li>a.b is interpreted as a.0.0.b
+ *   <li>a.b.c is interpreted as a.b.0.c
+ * </ul>
+ *
+ * <p>Leading zeros in octets are parsed as decimal (consistent with MySQL), 
not octal.
+ *
+ * <p>Note: This function only supports IPv4 addresses. IPv6 addresses are not 
supported.
+ *
+ * <p><b>Implementation Note:</b> This implementation does not use utility 
classes such as {@code
+ * com.google.common.net.InetAddresses} or {@code sun.net.util.IPAddressUtil} 
because:
+ *
+ * <ul>
+ *   <li>Guava's {@code InetAddresses.forString()} does not support 
MySQL-compatible short-form IP
+ *       addresses (e.g., "127.1" interpreted as "127.0.0.1")
+ *   <li>Standard IP parsers may interpret leading zeros as octal (e.g., "010" 
as 8), while MySQL
+ *       treats them as decimal (e.g., "010" as 10)
+ *   <li>{@code sun.net.util.IPAddressUtil} is a JDK internal API requiring 
{@code --add-exports},
+ *       which introduces JDK version compatibility issues
+ * </ul>
+ *
+ * <p>Examples:
+ *
+ * <ul>
+ *   <li>INET_ATON('127.0.0.1') returns 2130706433
+ *   <li>INET_ATON('127.1') returns 2130706433 (short-form: 127.0.0.1)
+ *   <li>INET_ATON('127.0.1') returns 2130706433 (short-form: 127.0.0.1)
+ *   <li>INET_ATON('10.0.0.1') returns 167772161
+ *   <li>INET_ATON('0.0.0.0') returns 0
+ * </ul>
+ */
+@Internal
+public class InetAtonFunction extends BuiltInScalarFunction {
+
+    public InetAtonFunction(SpecializedFunction.SpecializedContext context) {
+        super(BuiltInFunctionDefinitions.INET_ATON, context);
+    }
+
+    /**
+     * Converts an IPv4 address string to its numeric representation.
+     *
+     * @param ipAddress the IPv4 address string in dotted-decimal notation 
(supports short-form)
+     * @return the numeric representation of the IP address, or null if input 
is null or invalid
+     */
+    public @Nullable Long eval(@Nullable StringData ipAddress) {
+        if (ipAddress == null) {
+            return null;
+        }
+
+        final String ip = ipAddress.toString();
+        if (ip.isEmpty()) {
+            return null;
+        }
+
+        return ipToLong(ip);
+    }
+
+    /**
+     * Converts an IPv4 address string to a long value.
+     *
+     * <p>Supports MySQL-compatible short-form addresses:
+     *
+     * <ul>
+     *   <li>a.b -> a.0.0.b
+     *   <li>a.b.c -> a.b.0.c
+     *   <li>a.b.c.d -> standard format
+     * </ul>
+     *
+     * <p>Leading zeros are treated as decimal (not octal), consistent with 
MySQL behavior.
+     *
+     * @param ip the IPv4 address string
+     * @return the long value, or null if the IP address is invalid
+     */
+    private static @Nullable Long ipToLong(String ip) {
+        int len = ip.length();
+        int octetCount = 0;
+        int octetStart = 0;
+        long[] octets = new long[4];
+
+        for (int i = 0; i <= len; i++) {
+            if (i == len || ip.charAt(i) == '.') {
+                if (octetCount >= 4) {
+                    return null;
+                }
+                if (octetStart == i) {
+                    return null;
+                }
+
+                // Parse number manually
+                long octet = 0;
+                for (int j = octetStart; j < i; j++) {
+                    char c = ip.charAt(j);
+                    if (c < '0' || c > '9') {
+                        return null;
+                    }
+                    octet = octet * 10 + (c - '0');
+                    if (octet > 255) {
+                        return null;
+                    }
+                }
+                octets[octetCount++] = octet;
+                octetStart = i + 1;
+            }
+        }
+
+        if (octetCount < 2) {
+            return null;
+        }

Review Comment:
   It seems that MySQL supports single num address, for example, `'1' -> 1`. 
Could you double check this behaviour?



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/InetNtoaFunction.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+
+import javax.annotation.Nullable;
+
+/**
+ * Implementation of {@link BuiltInFunctionDefinitions#INET_NTOA}.
+ *
+ * <p>This function converts a numeric IPv4 address representation back to its 
string format. It
+ * follows the MySQL INET_NTOA function behavior.
+ *
+ * <p>The conversion extracts each octet from the numeric value using bit 
shifting and masking.
+ *
+ * <p>Note: This function only supports IPv4 addresses. IPv6 addresses are not 
supported.
+ *
+ * <p><b>Implementation Note:</b> This implementation does not use utility 
classes such as {@code
+ * com.google.common.net.InetAddresses} or {@code sun.net.util.IPAddressUtil} 
because:
+ *
+ * <ul>
+ *   <li>The conversion from number to IP string is straightforward and does 
not benefit from
+ *       external libraries
+ *   <li>Using a custom implementation avoids unnecessary dependencies and 
object allocations (e.g.,
+ *       creating {@code InetAddress} objects)
+ *   <li>{@code sun.net.util.IPAddressUtil} is a JDK internal API requiring 
{@code --add-exports},
+ *       which introduces JDK version compatibility issues
+ * </ul>
+ *
+ * <p>Examples:
+ *
+ * <ul>
+ *   <li>INET_NTOA(2130706433) returns '127.0.0.1'
+ *   <li>INET_NTOA(167772161) returns '10.0.0.1'
+ *   <li>INET_NTOA(0) returns '0.0.0.0'
+ * </ul>
+ */
+@Internal
+public class InetNtoaFunction extends BuiltInScalarFunction {
+
+    private static final long MAX_IPV4_VALUE = 0xFFFFFFFFL;
+    private static final int OCTET_MASK = 0xFF;
+    private static final int BITS_PER_OCTET = 8;
+
+    public InetNtoaFunction(SpecializedFunction.SpecializedContext context) {
+        super(BuiltInFunctionDefinitions.INET_NTOA, context);
+    }
+
+    /**
+     * Converts a numeric IPv4 address representation (Long) to its string 
format.
+     *
+     * @param ipNumber the numeric representation of the IPv4 address
+     * @return the IPv4 address string in dotted-decimal notation, or null if 
input is null or out
+     *     of valid range
+     */
+    public @Nullable StringData eval(@Nullable Long ipNumber) {
+        if (ipNumber == null) {
+            return null;
+        }
+        return convertToIp(ipNumber);
+    }
+
+    /**
+     * Converts a numeric IPv4 address representation (Integer) to its string 
format.
+     *
+     * <p>This overload handles INT type inputs which Flink may pass directly 
without implicit
+     * conversion to Long.
+     *
+     * @param ipNumber the numeric representation of the IPv4 address
+     * @return the IPv4 address string in dotted-decimal notation, or null if 
input is null or out
+     *     of valid range
+     */
+    public @Nullable StringData eval(@Nullable Integer ipNumber) {
+        if (ipNumber == null) {
+            return null;
+        }
+        // Convert to long, treating negative integers as unsigned
+        // For example, -1 (0xFFFFFFFF as signed int) should be treated as 
4294967295
+        return convertToIp(Integer.toUnsignedLong(ipNumber));
+    }
+
+    /**
+     * Internal conversion method.
+     *
+     * @param ipNumber the numeric representation
+     * @return the IPv4 address string, or null if out of valid range
+     */
+    private @Nullable StringData convertToIp(long ipNumber) {
+        // Check if the number is within valid IPv4 range [0, 4294967295]
+        if (ipNumber < 0 || ipNumber > MAX_IPV4_VALUE) {
+            return null;
+        }
+
+        return StringData.fromString(longToIp(ipNumber));
+    }
+
+    /**
+     * Converts a numeric IPv4 address representation to its string format.
+     *
+     * @param ipNumber the numeric representation
+     * @return the IPv4 address string
+     */
+    private static String longToIp(long ipNumber) {

Review Comment:
   nit: `convertToIp` and `longToIp` can be merged into one, and the static 
constants can also be inlined, as the values are self-explanatory in this 
context.



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##########
@@ -463,6 +463,43 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
                             
"org.apache.flink.table.runtime.functions.scalar.UrlEncodeFunction")
                     .build();
 
+    /**
+     * Converts an IPv4 address string to its numeric representation. Follows 
MySQL INET_ATON
+     * behavior.
+     *
+     * <p>Supports MySQL-compatible short-form addresses (a.b, a.b.c) and 
treats leading zeros as
+     * decimal.
+     *
+     * <p>Example: INET_ATON('127.0.0.1') returns 2130706433; 
INET_ATON('127.1') returns 2130706433
+     */
+    public static final BuiltInFunctionDefinition INET_ATON =
+            BuiltInFunctionDefinition.newBuilder()
+                    .name("INET_ATON")
+                    .kind(SCALAR)
+                    
.inputTypeStrategy(sequence(logical(LogicalTypeFamily.CHARACTER_STRING)))
+                    .outputTypeStrategy(explicit(BIGINT().nullable()))
+                    .runtimeClass(
+                            
"org.apache.flink.table.runtime.functions.scalar.InetAtonFunction")
+                    .build();
+
+    /**
+     * Converts a numeric IPv4 address representation back to its string 
format. Follows MySQL
+     * INET_NTOA behavior.
+     *
+     * <p>Accepts BIGINT or INT input. Input must be in valid range [0, 
4294967295].
+     *
+     * <p>Example: INET_NTOA(2130706433) returns '127.0.0.1'
+     */
+    public static final BuiltInFunctionDefinition INET_NTOA =
+            BuiltInFunctionDefinition.newBuilder()
+                    .name("INET_NTOA")
+                    .kind(SCALAR)
+                    
.inputTypeStrategy(sequence(logical(LogicalTypeFamily.INTEGER_NUMERIC)))

Review Comment:
   Could you also test `SMALLINT` and `TINYINT` input?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to