This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 32849a284b34931392eb97c29d46a911c012f9ef Author: Roman Khachatryan <[email protected]> AuthorDate: Tue Mar 4 15:10:02 2025 +0100 [hotfix] Move RowTypeUtils to flink-table-common This allows to use RowTypeUtils both in planner and in runtime --- .../main/java/org/apache/flink/table}/typeutils/RowTypeUtils.java | 6 ++++-- .../java/org/apache/flink/table}/typeutils/RowTypeUtilsTest.java | 2 +- .../batch/runtimefilter/BatchExecLocalRuntimeFilterBuilder.java | 2 +- .../flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java | 2 +- .../stream/WatermarkAssignerChangelogNormalizeTransposeRule.java | 2 +- .../table/planner/codegen/agg/batch/HashAggCodeGenerator.scala | 2 +- .../table/planner/codegen/agg/batch/SortAggCodeGenerator.scala | 2 +- .../flink/table/planner/codegen/agg/batch/WindowCodeGenerator.scala | 2 +- .../planner/codegen/runtimefilter/RuntimeFilterCodeGenerator.scala | 4 ++-- .../table/planner/plan/fusion/spec/HashAggFusionCodegenSpec.scala | 6 +++--- .../planner/plan/fusion/spec/RuntimeFilterFusionCodegenSpec.scala | 2 +- .../org/apache/flink/table/planner/plan/nodes/calcite/Rank.scala | 2 +- .../apache/flink/table/planner/plan/utils/OverAggregateUtil.scala | 2 +- .../org/apache/flink/table/planner/plan/utils/WindowUtil.scala | 2 +- 14 files changed, 20 insertions(+), 18 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/typeutils/RowTypeUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/typeutils/RowTypeUtils.java similarity index 95% rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/typeutils/RowTypeUtils.java rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/typeutils/RowTypeUtils.java index 4d9879d7b99..d70c3ef1b98 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/typeutils/RowTypeUtils.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/typeutils/RowTypeUtils.java @@ -16,8 +16,9 @@ * limitations under the License. */ -package org.apache.flink.table.planner.typeutils; +package org.apache.flink.table.typeutils; +import org.apache.flink.annotation.Internal; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; @@ -28,7 +29,8 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; -/** Utils for deriving row types of {@link org.apache.calcite.rel.RelNode}s. */ +/** Utils for deriving row types of org.apache.calcite.rel.RelNode. */ +@Internal public class RowTypeUtils { public static String getUniqueName(String oldName, List<String> checklist) { diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/typeutils/RowTypeUtilsTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/typeutils/RowTypeUtilsTest.java similarity index 98% rename from flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/typeutils/RowTypeUtilsTest.java rename to flink-table/flink-table-common/src/test/java/org/apache/flink/table/typeutils/RowTypeUtilsTest.java index 143a21503e3..b1851c62aa5 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/typeutils/RowTypeUtilsTest.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/typeutils/RowTypeUtilsTest.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.table.planner.typeutils; +package org.apache.flink.table.typeutils; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.types.logical.BigIntType; diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/runtimefilter/BatchExecLocalRuntimeFilterBuilder.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/runtimefilter/BatchExecLocalRuntimeFilterBuilder.java index 4f8496cc15d..fa242100a1c 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/runtimefilter/BatchExecLocalRuntimeFilterBuilder.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/runtimefilter/BatchExecLocalRuntimeFilterBuilder.java @@ -34,12 +34,12 @@ import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext; import org.apache.flink.table.planner.plan.nodes.exec.InputProperty; import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecNode; import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil; -import org.apache.flink.table.planner.typeutils.RowTypeUtils; import org.apache.flink.table.runtime.generated.GeneratedProjection; import org.apache.flink.table.runtime.operators.runtimefilter.LocalRuntimeFilterBuilderOperator; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.typeutils.RowTypeUtils; import java.util.List; diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java index 8b6c5614ee0..59c517b9474 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java @@ -42,7 +42,6 @@ import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink; import org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSinkSpec; import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil; import org.apache.flink.table.planner.plan.utils.KeySelectorUtil; -import org.apache.flink.table.planner.typeutils.RowTypeUtils; import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser; import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; import org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer; @@ -52,6 +51,7 @@ import org.apache.flink.table.runtime.typeutils.TypeCheckUtils; import org.apache.flink.table.runtime.util.StateConfigUtil; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.typeutils.RowTypeUtils; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude; diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRule.java index c55c872b224..e790df7c9e0 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRule.java @@ -25,7 +25,7 @@ import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalE import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWatermarkAssigner; import org.apache.flink.table.planner.plan.trait.FlinkRelDistribution; import org.apache.flink.table.planner.plan.trait.FlinkRelDistributionTraitDef; -import org.apache.flink.table.planner.typeutils.RowTypeUtils; +import org.apache.flink.table.typeutils.RowTypeUtils; import org.apache.calcite.plan.RelOptRule; import org.apache.calcite.plan.RelOptRuleCall; diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala index f51fddc005a..075f7343d73 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenerator.scala @@ -26,12 +26,12 @@ import org.apache.flink.table.functions.DeclarativeAggregateFunction import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, CodeGenUtils, OperatorCodeGenerator, ProjectionCodeGenerator} import org.apache.flink.table.planner.codegen.CodeGenUtils.ROW_DATA import org.apache.flink.table.planner.plan.utils.AggregateInfoList -import org.apache.flink.table.planner.typeutils.RowTypeUtils import org.apache.flink.table.runtime.generated.GeneratedOperator import org.apache.flink.table.runtime.operators.TableStreamOperator import org.apache.flink.table.runtime.operators.aggregate.BytesHashMapSpillMemorySegmentPool import org.apache.flink.table.runtime.util.collections.binary.BytesMap import org.apache.flink.table.types.logical.RowType +import org.apache.flink.table.typeutils.RowTypeUtils import org.apache.calcite.tools.RelBuilder diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/SortAggCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/SortAggCodeGenerator.scala index dc2605bb930..f68c302239a 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/SortAggCodeGenerator.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/SortAggCodeGenerator.scala @@ -25,10 +25,10 @@ import org.apache.flink.table.functions.AggregateFunction import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, CodeGenUtils, ProjectionCodeGenerator} import org.apache.flink.table.planner.codegen.OperatorCodeGenerator.generateCollect import org.apache.flink.table.planner.plan.utils.AggregateInfoList -import org.apache.flink.table.planner.typeutils.RowTypeUtils import org.apache.flink.table.runtime.generated.GeneratedOperator import org.apache.flink.table.runtime.operators.TableStreamOperator import org.apache.flink.table.types.logical.RowType +import org.apache.flink.table.typeutils.RowTypeUtils import org.apache.calcite.tools.RelBuilder diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/WindowCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/WindowCodeGenerator.scala index 41bbe7d2bee..66ea53c2b1e 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/WindowCodeGenerator.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/WindowCodeGenerator.scala @@ -38,7 +38,6 @@ import org.apache.flink.table.planner.expressions.ExpressionBuilder._ import org.apache.flink.table.planner.expressions.converter.ExpressionConverter import org.apache.flink.table.planner.plan.logical.{LogicalWindow, SlidingGroupWindow, TumblingGroupWindow} import org.apache.flink.table.planner.plan.utils.{AggregateInfo, AggregateInfoList, AggregateUtil} -import org.apache.flink.table.planner.typeutils.RowTypeUtils import org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTypeFactory import org.apache.flink.table.runtime.groupwindow.NamedWindowProperty import org.apache.flink.table.runtime.operators.window.TimeWindow @@ -46,6 +45,7 @@ import org.apache.flink.table.runtime.operators.window.grouping.{HeapWindowsGrou import org.apache.flink.table.runtime.util.RowIterator import org.apache.flink.table.types.logical._ import org.apache.flink.table.types.logical.LogicalTypeRoot.INTERVAL_DAY_TIME +import org.apache.flink.table.typeutils.RowTypeUtils import org.apache.flink.table.utils.DateTimeUtils import org.apache.calcite.rel.core.AggregateCall diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/runtimefilter/RuntimeFilterCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/runtimefilter/RuntimeFilterCodeGenerator.scala index 8d2fd418446..a036aeb420c 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/runtimefilter/RuntimeFilterCodeGenerator.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/runtimefilter/RuntimeFilterCodeGenerator.scala @@ -20,11 +20,11 @@ package org.apache.flink.table.planner.codegen.runtimefilter import org.apache.flink.runtime.operators.util.BloomFilter import org.apache.flink.table.data.RowData import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, OperatorCodeGenerator, ProjectionCodeGenerator} -import org.apache.flink.table.planner.codegen.CodeGenUtils.{className, newName, DEFAULT_INPUT1_TERM, DEFAULT_INPUT2_TERM, ROW_DATA} +import org.apache.flink.table.planner.codegen.CodeGenUtils.{className, newName, DEFAULT_INPUT1_TERM, DEFAULT_INPUT2_TERM} import org.apache.flink.table.planner.codegen.OperatorCodeGenerator.{generateCollect, INPUT_SELECTION} -import org.apache.flink.table.planner.typeutils.RowTypeUtils import org.apache.flink.table.runtime.operators.CodeGenOperatorFactory import org.apache.flink.table.types.logical.RowType +import org.apache.flink.table.typeutils.RowTypeUtils import org.apache.flink.util.Preconditions /** Operator code generator for runtime filter operator. */ diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/fusion/spec/HashAggFusionCodegenSpec.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/fusion/spec/HashAggFusionCodegenSpec.scala index 7035fe7a419..3402703cf81 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/fusion/spec/HashAggFusionCodegenSpec.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/fusion/spec/HashAggFusionCodegenSpec.scala @@ -26,17 +26,17 @@ import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, CodeGenUtil import org.apache.flink.table.planner.codegen.CodeGenUtils.{getReuseRowFieldExprs, newName, newNames} import org.apache.flink.table.planner.codegen.ProjectionCodeGenerator.genAdaptiveLocalHashAggValueProjectionExpr import org.apache.flink.table.planner.codegen.agg.batch.{AggCodeGenHelper, HashAggCodeGenHelper} -import org.apache.flink.table.planner.codegen.agg.batch.AggCodeGenHelper.{buildAggregateArgsMapping, genAggregateByFlatAggregateBuffer, genFlatAggBufferExprs, genGetValueFromFlatAggregateBuffer, genInitFlatAggregateBuffer} -import org.apache.flink.table.planner.codegen.agg.batch.HashAggCodeGenHelper.{buildAggregateAggBuffMapping, genAggregate, genCreateFallbackSorter, genHashAggValueExpr, genRetryAppendToMap, genReusableEmptyAggBuffer, prepareFallbackSorter} +import org.apache.flink.table.planner.codegen.agg.batch.AggCodeGenHelper._ +import org.apache.flink.table.planner.codegen.agg.batch.HashAggCodeGenHelper._ import org.apache.flink.table.planner.plan.fusion.{OpFusionCodegenSpecBase, OpFusionContext} import org.apache.flink.table.planner.plan.fusion.FusionCodegenUtil.{constructDoConsumeCode, constructDoConsumeFunction, evaluateVariables} import org.apache.flink.table.planner.plan.utils.AggregateInfoList -import org.apache.flink.table.planner.typeutils.RowTypeUtils import org.apache.flink.table.planner.utils.JavaScalaConversionUtil.{toJava, toScala} import org.apache.flink.table.runtime.operators.aggregate.BytesHashMapSpillMemorySegmentPool import org.apache.flink.table.runtime.util.KeyValueIterator import org.apache.flink.table.runtime.util.collections.binary.{BytesHashMap, BytesMap} import org.apache.flink.table.types.logical.{LogicalType, RowType} +import org.apache.flink.table.typeutils.RowTypeUtils import org.apache.calcite.tools.RelBuilder diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/fusion/spec/RuntimeFilterFusionCodegenSpec.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/fusion/spec/RuntimeFilterFusionCodegenSpec.scala index a7e075b6217..0f049a844aa 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/fusion/spec/RuntimeFilterFusionCodegenSpec.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/fusion/spec/RuntimeFilterFusionCodegenSpec.scala @@ -22,8 +22,8 @@ import org.apache.flink.table.data.binary.BinaryRowData import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, GeneratedExpression} import org.apache.flink.table.planner.codegen.CodeGenUtils.{className, newName, newNames} import org.apache.flink.table.planner.plan.fusion.{OpFusionCodegenSpecBase, OpFusionContext} -import org.apache.flink.table.planner.typeutils.RowTypeUtils import org.apache.flink.table.types.logical.RowType +import org.apache.flink.table.typeutils.RowTypeUtils import org.apache.flink.util.Preconditions import java.util diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/Rank.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/Rank.scala index a7a61f378aa..671e3401256 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/Rank.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/Rank.scala @@ -19,8 +19,8 @@ package org.apache.flink.table.planner.plan.nodes.calcite import org.apache.flink.table.api.TableException import org.apache.flink.table.planner.plan.utils._ -import org.apache.flink.table.planner.typeutils.RowTypeUtils import org.apache.flink.table.runtime.operators.rank.{ConstantRankRange, RankRange, RankType, VariableRankRange} +import org.apache.flink.table.typeutils.RowTypeUtils import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet} import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField, RelDataTypeFieldImpl} diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/OverAggregateUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/OverAggregateUtil.scala index 389c34a30e6..d3e1128aa48 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/OverAggregateUtil.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/OverAggregateUtil.scala @@ -22,7 +22,7 @@ import org.apache.flink.table.planner.JArrayList import org.apache.flink.table.planner.calcite.FlinkTypeFactory import org.apache.flink.table.planner.plan.nodes.exec.spec.{OverSpec, PartitionSpec} import org.apache.flink.table.planner.plan.nodes.exec.spec.OverSpec.GroupSpec -import org.apache.flink.table.planner.typeutils.RowTypeUtils +import org.apache.flink.table.typeutils.RowTypeUtils import org.apache.calcite.plan.RelOptCluster import org.apache.calcite.rel.`type`.RelDataType diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala index 5be058832c6..9102f68528d 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala @@ -29,13 +29,13 @@ import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery import org.apache.flink.table.planner.plan.nodes.logical._ import org.apache.flink.table.planner.plan.utils.AggregateUtil.inferAggAccumulatorNames import org.apache.flink.table.planner.plan.utils.WindowEmitStrategy.{TABLE_EXEC_EMIT_EARLY_FIRE_ENABLED, TABLE_EXEC_EMIT_LATE_FIRE_ENABLED} -import org.apache.flink.table.planner.typeutils.RowTypeUtils import org.apache.flink.table.runtime.groupwindow._ import org.apache.flink.table.runtime.operators.window.tvf.common.WindowAssigner import org.apache.flink.table.runtime.operators.window.tvf.slicing.SliceAssigner import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType import org.apache.flink.table.types.logical.TimestampType import org.apache.flink.table.types.logical.utils.LogicalTypeChecks.canBeTimeAttributeType +import org.apache.flink.table.typeutils.RowTypeUtils import org.apache.calcite.plan.volcano.RelSubset import org.apache.calcite.rel.`type`.RelDataType
