gaborgsomogyi commented on code in PR #28142: URL: https://github.com/apache/flink/pull/28142#discussion_r3396276188
########## flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/filter/BoundComparison.java: ########## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.api.filter; + +import javax.annotation.Nullable; + +import java.io.Serializable; + +/** Comparison strategy resolved from the bound types at filter construction time. */ +enum BoundComparison implements Serializable { + LONG { + @Override + public int compare(Object a, Object b) { + return Long.compare(((Number) a).longValue(), ((Number) b).longValue()); + } + }, + DOUBLE { + @Override + public int compare(Object a, Object b) { + return Double.compare(((Number) a).doubleValue(), ((Number) b).doubleValue()); + } + }, + NATURAL { + @SuppressWarnings("unchecked") + @Override + public int compare(Object a, Object b) { + return ((Comparable<Object>) a).compareTo(b); + } + }; + + public abstract int compare(Object a, Object b); + + static BoundComparison resolve(@Nullable Object lower, @Nullable Object upper) { + if (isFloatingPoint(lower) || isFloatingPoint(upper)) { + return DOUBLE; + } + if (lower instanceof Number || upper instanceof Number) { + return LONG; + } + return NATURAL; + } Review Comment: Widening is needed to avoid silent no-pushdown for common queries like `WHERE k > 3` on a `BIGINT` column. Relying on `getValueAs(targetClass)` alone is not enough since it does not do numeric widening. The two cases that matter in practice are `Integer -> Long` and `Float -> Double`. A small explicit widening step in `extractValue` covers them without pulling in any type-system dependencies: ```java @Nullable private static Object extractValue(ResolvedExpression expr, DataType keyType) { if (!(expr instanceof ValueLiteralExpression)) { return null; } ValueLiteralExpression literal = (ValueLiteralExpression) expr; Class<Object> literalCls = (Class<Object>) literal.getOutputDataType().getConversionClass(); Object value = literal.getValueAs(literalCls).orElse(null); if (value == null) { return null; } return widenToKeyType(value, keyType.getConversionClass()); } private static Object widenToKeyType(Object value, Class<?> keyClass) { if (keyClass.isInstance(value)) { return value; } if (value instanceof Number) { if (keyClass == Long.class) return ((Number) value).longValue(); if (keyClass == Double.class) return ((Number) value).doubleValue(); } return null; // narrowing or non-numeric — reject filter, falls to remaining } ``` Narrowing (e.g. `Long -> Integer`) and non-numeric mismatches return `null`, the filter falls to `remaining`, and Flink evaluates it correctly. No precision loss, no silent degradation for the common case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
