fhueske commented on code in PR #27928: URL: https://github.com/apache/flink/pull/27928#discussion_r3259201512
########## flink-table/flink-table-test-utils/src/test/java/org/apache/flink/table/runtime/functions/ProcessTableFunctionTestHarnessTest.java: ########## @@ -0,0 +1,1021 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions; + +import org.apache.flink.table.annotation.ArgumentHint; +import org.apache.flink.table.annotation.ArgumentTrait; +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.StateHint; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.functions.ProcessTableFunction; +import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; + +import org.junit.jupiter.api.Test; + +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; + +class ProcessTableFunctionTestHarnessTest { + + @DataTypeHint("ROW<value INT>") + public static class PassthroughPTF extends ProcessTableFunction<Row> { + public void eval(@ArgumentHint(ArgumentTrait.ROW_SEMANTIC_TABLE) Row input) { + collect(input); + } + } + + /** Filter PTF for testing scalar argument handling. */ + @DataTypeHint("ROW<value INT>") + public static class FilterPTF extends ProcessTableFunction<Row> { + public void eval( + @ArgumentHint(ArgumentTrait.ROW_SEMANTIC_TABLE) Row input, + @ArgumentHint(ArgumentTrait.SCALAR) Integer threshold) { + // Use named field access - converter enriches Row with field names + int value = input.getFieldAs("value"); + if (value >= threshold) { + collect(input); + } + } + } + + /** PTF for testing transformation of output types. */ + @DataTypeHint("ROW<doubled INT, original INT>") + public static class DoublePTF extends ProcessTableFunction<Row> { + public void eval(@ArgumentHint(ArgumentTrait.ROW_SEMANTIC_TABLE) Row input) { + int value = (Integer) input.getField(0); + collect(Row.of(value * 2, value)); + } + } + + /** PTF with for testing table argument names set via argument hints. */ + @DataTypeHint("ROW<value INT>") + public static class ExplicitNamePTF extends ProcessTableFunction<Row> { + public void eval( + @ArgumentHint(value = ArgumentTrait.ROW_SEMANTIC_TABLE, name = "customName") + Row actualParamName) { + collect(actualParamName); + } + } + + /** PTF with inline type annotation - no builder config needed. */ + @DataTypeHint("ROW<doubled INT>") + public static class InlineTypePTF extends ProcessTableFunction<Row> { + public void eval( + @ArgumentHint( + value = ArgumentTrait.ROW_SEMANTIC_TABLE, + type = @DataTypeHint("ROW<value INT>")) + Row input) { + int value = (Integer) input.getField(0); + collect(Row.of(value * 2)); + } + } + + @DataTypeHint("ROW<value INT>") + public static class PartitionedPTF extends ProcessTableFunction<Row> { + public void eval(@ArgumentHint(ArgumentTrait.SET_SEMANTIC_TABLE) Row input) { + collect(Row.of((Integer) input.getFieldAs("value"))); + } + } + + /** + * PTF with PASS_COLUMNS_THROUGH for validating that all input columns are prepended to output. + */ + @DataTypeHint("ROW<doubled INT>") + public static class PassColumnsThroughPTF extends ProcessTableFunction<Row> { + public void eval( + @ArgumentHint({ + ArgumentTrait.SET_SEMANTIC_TABLE, + ArgumentTrait.PASS_COLUMNS_THROUGH + }) + Row input) { + int value = (Integer) input.getField(1); + collect(Row.of(value * 2)); + } + } + + /** PTF with OPTIONAL_PARTITION_BY for validating that partition setup can be omitted. */ + @DataTypeHint("ROW<doubled INT>") + public static class OptionalPartitionPTF extends ProcessTableFunction<Row> { + + public void eval( + @ArgumentHint({ + ArgumentTrait.SET_SEMANTIC_TABLE, + ArgumentTrait.OPTIONAL_PARTITION_BY + }) + Row input) { + int value = (Integer) input.getField(1); + collect(Row.of(value * 2)); + } + } + + /** Simple POJO for testing structured type input/output. */ + public static class User { + public String name; + public int age; + + public User() {} + + public User(String name, int age) { + this.name = name; + this.age = age; + } + + @Override + public String toString() { + return "User{name='" + name + "', age=" + age + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + User user = (User) o; + return age == user.age && java.util.Objects.equals(name, user.name); + } + + @Override + public int hashCode() { + return java.util.Objects.hash(name, age); + } + } + + /** PTF for testing structured type inputs. */ + @DataTypeHint("ROW<name STRING, age INT>") + public static class UserPTF extends ProcessTableFunction<Row> { + public void eval(@ArgumentHint(ArgumentTrait.ROW_SEMANTIC_TABLE) User user) { + if (user.age >= 18) { + collect(Row.of(user.name, user.age)); + } + } + } + + /** PTF that transforms structured type inputs and outputs. */ + public static class UserTransformPTF extends ProcessTableFunction<User> { + public void eval(@ArgumentHint(ArgumentTrait.ROW_SEMANTIC_TABLE) User user) { + User transformed = new User(user.name, user.age + 1); + collect(transformed); + } + } + + /** Invalid PTF - uses reserved argument name "on_time". */ + @DataTypeHint("ROW<value INT>") + public static class InvalidReservedArgOnTimePTF extends ProcessTableFunction<Row> { + public void eval( + @ArgumentHint(value = ArgumentTrait.ROW_SEMANTIC_TABLE, name = "on_time") + Row input) { + collect(input); + } + } + + /** Invalid PTF - uses reserved argument name "uid". */ + @DataTypeHint("ROW<value INT>") + public static class InvalidReservedArgUidPTF extends ProcessTableFunction<Row> { + public void eval( + @ArgumentHint(ArgumentTrait.ROW_SEMANTIC_TABLE) Row input, + @ArgumentHint(ArgumentTrait.SCALAR) String uid) { + collect(input); + } + } + + /** Multi-table PTF for validating multi-input processing. */ + @DataTypeHint("ROW<output STRING>") + public static class MultiTableUnionPTF extends ProcessTableFunction<Row> { + public void eval( + @ArgumentHint(ArgumentTrait.SET_SEMANTIC_TABLE) Row leftTable, + @ArgumentHint(ArgumentTrait.SET_SEMANTIC_TABLE) Row rightTable) { + if (leftTable != null) { + collect(Row.of("LEFT: " + leftTable)); + } + if (rightTable != null) { + collect(Row.of("RIGHT: " + rightTable)); + } + } + } + + /** + * Multi-table PTF with one POJO and one Row argument, for testing partitioning with structured + * types. + */ + @DataTypeHint("ROW<source STRING, age INT>") + public static class MixedTypeMultiTablePTF extends ProcessTableFunction<Row> { + public void eval( + @ArgumentHint(ArgumentTrait.SET_SEMANTIC_TABLE) User userTable, + @ArgumentHint(ArgumentTrait.SET_SEMANTIC_TABLE) Row rowTable) { + if (userTable != null) { + collect(Row.of("USER", userTable.age)); + } + if (rowTable != null) { + collect(Row.of("ROW", rowTable.getFieldAs("age"))); + } + } + } + + /** + * Invalid PTF - uses PASS_COLUMNS_THROUGH with multiple table arguments (not allowed per Flink + * docs). + */ + @DataTypeHint("ROW<output STRING>") + public static class InvalidPassColumnsThroughMultiTablePTF extends ProcessTableFunction<Row> { + public void eval( + @ArgumentHint({ + ArgumentTrait.SET_SEMANTIC_TABLE, + ArgumentTrait.PASS_COLUMNS_THROUGH + }) + Row leftTable, + @ArgumentHint(ArgumentTrait.SET_SEMANTIC_TABLE) Row rightTable) { + if (leftTable != null) { + collect(Row.of("LEFT: " + leftTable)); + } + if (rightTable != null) { + collect(Row.of("RIGHT: " + rightTable)); + } + } + } + + /** PTF with only scalar arguments, no tables. */ + @DataTypeHint("ROW<sum INT>") + public static class ScalarOnlyPTF extends ProcessTableFunction<Row> { + public void eval( + @ArgumentHint(ArgumentTrait.SCALAR) Integer a, + @ArgumentHint(ArgumentTrait.SCALAR) Integer b) { + collect(Row.of(a + b)); + } + } + + /** PTF with Context parameter - should be rejected by test harness. */ + @DataTypeHint("ROW<value INT>") + public static class PTFWithContext extends ProcessTableFunction<Row> { + public void eval(Context ctx, @ArgumentHint(ArgumentTrait.ROW_SEMANTIC_TABLE) Row input) { + collect(input); + } + } + + /** PTF with State parameter - should be rejected by test harness. */ + @DataTypeHint("ROW<value INT>") + public static class PTFWithState extends ProcessTableFunction<Row> { + public static class CountState { + public long counter = 0L; + } + + public void eval( + @StateHint CountState state, + @ArgumentHint(ArgumentTrait.ROW_SEMANTIC_TABLE) Row input) { + collect(input); + } + } + + // ------------------------------------------------------------------------- + // Builder Configuration Tests + // ------------------------------------------------------------------------- + + @Test + void testBuilderRejectsDuplicateScalarArguments() { + Exception exception = + assertThrows( + IllegalArgumentException.class, + () -> { + ProcessTableFunctionTestHarness.ofClass(FilterPTF.class) + .withTableArgument("input", DataTypes.of("ROW<value INT>")) + .withScalarArgument("threshold", 50) + .withScalarArgument("threshold", 100); + }); + + assertThat(exception.getMessage()).contains("threshold"); + } + + @Test + void testBuilderRejectsDuplicateTableArguments() { + Exception exception = + assertThrows( + IllegalArgumentException.class, + () -> { + ProcessTableFunctionTestHarness.ofClass(MultiTableUnionPTF.class) + .withTableArgument( + "leftTable", DataTypes.of("ROW<id INT, name STRING>")) + .withTableArgument( + "leftTable", DataTypes.of("ROW<id INT, value INT>")); + }); + + assertThat(exception.getMessage()).contains("leftTable"); + } + + @Test + void testBuilderRejectsMixedDuplicateArguments() { + Exception exception = + assertThrows( + IllegalArgumentException.class, + () -> { + ProcessTableFunctionTestHarness.ofClass(FilterPTF.class) + .withTableArgument("input", DataTypes.of("ROW<value INT>")) + .withScalarArgument("input", 42); + }); + + assertThat(exception.getMessage()).contains("input"); + } + + @Test + void testBuilderRejectsReservedArgumentOnTime() { + // We should reject PTFs that use reserved argument name "on_time" + ProcessTableFunctionTestHarness.Builder harnessBuilder = + ProcessTableFunctionTestHarness.ofClass(InvalidReservedArgOnTimePTF.class) + .withTableArgument("on_time", DataTypes.of("ROW<id INT>")); + + ValidationException exception = + assertThrows( + ValidationException.class, + () -> { + harnessBuilder.build(); + }); + + assertThat(exception.getMessage()) + .contains("Function signature must not declare system arguments") + .contains("on_time"); + } + + @Test + void testBuilderRejectsReservedArgumentUid() { + // We should reject PTFs that use reserved argument name "uid" + ProcessTableFunctionTestHarness.Builder harnessBuilder = + ProcessTableFunctionTestHarness.ofClass(InvalidReservedArgUidPTF.class) + .withTableArgument("input", DataTypes.of("ROW<id INT>")) + .withScalarArgument("uid", "my-id"); + + ValidationException exception = + assertThrows( + ValidationException.class, + () -> { + harnessBuilder.build(); + }); + + assertThat(exception.getMessage()) + .contains("Function signature must not declare system arguments") + .contains("uid"); + } + + // ------------------------------------------------------------------------- + // Argument Configuration Tests + // ------------------------------------------------------------------------- + + @Test + void testExplicitNameTakesPrecedence() throws Exception { + // Verify that @ArgumentHint(name="customName") takes precedence over actual parameter + // name when processing elements and calling eval. + + try (ProcessTableFunctionTestHarness<Row> harness = + ProcessTableFunctionTestHarness.ofClass(ExplicitNamePTF.class) + .withTableArgument("customName", DataTypes.of("ROW<value INT>")) + .build()) { + + harness.processElement(Row.of(42)); + harness.processElement(Row.of(100)); + + List<Row> output = harness.getOutput(); + assertThat(output).hasSize(2); + assertThat(output.get(0).getField(0)).isEqualTo(42); + assertThat(output.get(1).getField(0)).isEqualTo(100); + } + } + + @Test + void testScalarOnlyPTF() throws Exception { + // Test scalar-only PTF with no table arguments + try (ProcessTableFunctionTestHarness<Row> harness = + ProcessTableFunctionTestHarness.ofClass(ScalarOnlyPTF.class) + .withScalarArgument("a", 10) + .withScalarArgument("b", 20) + .build()) { + + harness.process(); + + List<Row> output = harness.getOutput(); + + assertThat(output).hasSize(1); + assertThat(output.get(0).getField(0)).isEqualTo(30); + } + } + + @Test + void testInvokeRejectsTableArguments() throws Exception { + // Verify that invoke() rejects PTFs with table arguments + try (ProcessTableFunctionTestHarness<Row> harness = + ProcessTableFunctionTestHarness.ofClass(FilterPTF.class) + .withTableArgument("input", DataTypes.of("ROW<value INT>")) + .withScalarArgument("threshold", 50) + .build()) { + + Exception exception = + assertThrows( + IllegalStateException.class, + () -> { + harness.process(); + }); + + assertThat(exception.getMessage()).contains("process() is only for scalar-only PTFs"); + } + } + + @Test + void testTableProcessingWithScalarArgument() throws Exception { + // Test a PTF that uses a scalar parameter + try (ProcessTableFunctionTestHarness<Row> harness = + ProcessTableFunctionTestHarness.ofClass(FilterPTF.class) + .withTableArgument("input", DataTypes.of("ROW<value INT>")) + .withScalarArgument("threshold", 50) // Scalar argument: threshold = 50 + .build()) { + + harness.processElement(Row.of(25)); + harness.processElement(Row.of(75)); + harness.processElement(Row.of(50)); + harness.processElement(Row.of(10)); + harness.processElement(Row.of(100)); + + List<Row> output = harness.getOutput(); + + assertThat(output).hasSize(3); + assertThat(output.get(0).getField(0)).isEqualTo(75); + assertThat(output.get(1).getField(0)).isEqualTo(50); + assertThat(output.get(2).getField(0)).isEqualTo(100); + } + } + + // ------------------------------------------------------------------------- + // Argument Trait Tests + // ------------------------------------------------------------------------- + + @Test + void testProcessElementWithRowKind() throws Exception { + // Verify RowKind is preserved through processing (ROW_SEMANTIC_TABLE) + try (ProcessTableFunctionTestHarness<Row> harness = + ProcessTableFunctionTestHarness.ofClass(PassthroughPTF.class) + .withTableArgument("input", DataTypes.of("ROW<value INT>")) + .build()) { + + harness.processElement(RowKind.INSERT, 10); + harness.processElement(RowKind.UPDATE_BEFORE, 15); + harness.processElement(RowKind.UPDATE_AFTER, 20); + harness.processElement(RowKind.DELETE, 30); + + List<Row> output = harness.getOutput(); + assertThat(output).hasSize(4); + assertThat(output.get(0).getKind()).isEqualTo(RowKind.INSERT); + assertThat(output.get(0).getField(0)).isEqualTo(10); + assertThat(output.get(1).getKind()).isEqualTo(RowKind.UPDATE_BEFORE); + assertThat(output.get(1).getField(0)).isEqualTo(15); + assertThat(output.get(2).getKind()).isEqualTo(RowKind.UPDATE_AFTER); + assertThat(output.get(2).getField(0)).isEqualTo(20); + assertThat(output.get(3).getKind()).isEqualTo(RowKind.DELETE); + assertThat(output.get(3).getField(0)).isEqualTo(30); + } + } + + @Test + void testPassColumnsThroughTrait() throws Exception { + // Verify PASS_COLUMNS_THROUGH prepends ALL input columns (not just partition keys) + try (ProcessTableFunctionTestHarness<Row> harness = + ProcessTableFunctionTestHarness.ofClass(PassColumnsThroughPTF.class) + .withTableArgument("input", DataTypes.of("ROW<key STRING, value INT>")) + .withPartitionBy("input", "key") + .build()) { + + harness.processElement(Row.of("A", 10)); + harness.processElement(Row.of("B", 20)); + + List<Row> output = harness.getOutput(); + assertThat(output).hasSize(2); + + assertThat(output.get(0)).isEqualTo(Row.of("A", 10, 20)); + assertThat(output.get(1)).isEqualTo(Row.of("B", 20, 40)); + } + } + + @Test + void testOptionalPartitionByWithoutPartition() throws Exception { + // Verify OPTIONAL_PARTITION_BY allows omitting partition configuration + try (ProcessTableFunctionTestHarness<Row> harness = + ProcessTableFunctionTestHarness.ofClass(OptionalPartitionPTF.class) + .withTableArgument("input", DataTypes.of("ROW<key STRING, value INT>")) + .build()) { + + harness.processElement(Row.of("A", 10)); + harness.processElement(Row.of("B", 20)); + harness.processElement(Row.of("C", 30)); + + List<Row> output = harness.getOutput(); + assertThat(output).hasSize(3); + + assertThat(output.get(0)).isEqualTo(Row.of(20)); + assertThat(output.get(1)).isEqualTo(Row.of(40)); + assertThat(output.get(2)).isEqualTo(Row.of(60)); + } + } + + @Test + void testOptionalPartitionByWithPartition() throws Exception { + // Verify OPTIONAL_PARTITION_BY still works when partition is configured + try (ProcessTableFunctionTestHarness<Row> harness = + ProcessTableFunctionTestHarness.ofClass(OptionalPartitionPTF.class) + .withTableArgument("input", DataTypes.of("ROW<key STRING, value INT>")) + .withPartitionBy("input", "key") + .build()) { + + harness.processElement(Row.of("A", 10)); + harness.processElement(Row.of("A", 5)); + harness.processElement(Row.of("B", 20)); + + List<Row> output = harness.getOutput(); + assertThat(output).hasSize(3); + + assertThat(output.get(0)).isEqualTo(Row.of("A", 20)); + assertThat(output.get(1)).isEqualTo(Row.of("A", 10)); + assertThat(output.get(2)).isEqualTo(Row.of("B", 40)); + } + } + + // ------------------------------------------------------------------------- + // Data Type Conversion Tests + // ------------------------------------------------------------------------- + + @Test + void testNamedRowFieldOrdering() throws Exception { + // Test what happens when Row field order differs from DataType schema order + try (ProcessTableFunctionTestHarness<Row> harness = + ProcessTableFunctionTestHarness.ofClass(PassthroughPTF.class) + .withTableArgument("input", DataTypes.of("ROW<user STRING, value INT>")) + .build()) { + + Row rowA = Row.withNames(); + rowA.setField("value", 100); + rowA.setField("user", "Alice"); + + harness.processElement(rowA); + + List<Row> output = harness.getOutput(); + assertThat(output).hasSize(1); + + Row result = output.get(0); + + // Positional access follows schema order + assertThat(result.getField(0)).isEqualTo("Alice"); + assertThat(result.getField(1)).isEqualTo(100); + } + } + + @Test + void testPositionalRowWithWrongTypeOrder() throws Exception { + // Verify that type mismatches are caught when Row values don't match schema types + try (ProcessTableFunctionTestHarness<Row> harness = + ProcessTableFunctionTestHarness.ofClass(PassthroughPTF.class) + .withTableArgument("input", DataTypes.of("ROW<user STRING, value INT>")) + .build()) { + + Row wrongOrderRow = Row.of(10, "Alice"); + + Exception exception = + assertThrows( + ClassCastException.class, () -> harness.processElement(wrongOrderRow)); + + assertThat(exception.getMessage()).contains("Integer"); + assertThat(exception.getMessage()).contains("String"); + assertThat(exception.getMessage()).contains("cannot be cast"); + } + } + + @Test + void testStructuredTypeInput() throws Exception { + // Test that a PTF can declare an input type as a structured type, + // and that the harness can handle the conversion from Row into + // that type. + try (ProcessTableFunctionTestHarness<Row> harness = + ProcessTableFunctionTestHarness.ofClass(UserPTF.class) + .withTableArgument("user") + .build()) { + + harness.processElement(Row.of("Alice", 25)); + harness.processElement(Row.of("Bob", 17)); + + List<Row> output = harness.getOutput(); + assertThat(output).hasSize(1); + + Row result = output.get(0); + assertThat(result.getField(0)).isEqualTo("Alice"); + assertThat(result.getField(1)).isEqualTo(25); + } + } + + @Test + void testStructuredTypeInputAndOutput() throws Exception { + // Test PTF with structured type inputs and outputs + try (ProcessTableFunctionTestHarness<User> harness = + ProcessTableFunctionTestHarness.ofClass(UserTransformPTF.class) + .withTableArgument("user") + .build()) { + + harness.processElement(Row.of("Alice", 25)); + + List<User> output = harness.getOutput(); + assertThat(output).hasSize(1); + + User result = output.get(0); + assertThat(result.getClass()).isEqualTo(User.class); + assertThat(result.name).isEqualTo("Alice"); + assertThat(result.age).isEqualTo(26); + } + } + + @Test + void testInlineTypeAnnotation() throws Exception { + // Verify that PTFs can declare table argument types via @ArgumentHint(type = ...) + // without needing .withTableArgument() configuration + try (ProcessTableFunctionTestHarness<Row> harness = + ProcessTableFunctionTestHarness.ofClass(InlineTypePTF.class).build()) { + + harness.processElement(Row.of(5)); + harness.processElement(Row.of(10)); + + List<Row> output = harness.getOutput(); + assertThat(output).hasSize(2); + assertThat(output.get(0)).isEqualTo(Row.of(10)); + assertThat(output.get(1)).isEqualTo(Row.of(20)); + } + } + + @Test + void testInlineTypeMatchesBuilderConfig() throws Exception { + // Verify that when both inline annotation and builder config are provided with matching + // types, the harness builds successfully + try (ProcessTableFunctionTestHarness<Row> harness = + ProcessTableFunctionTestHarness.ofClass(InlineTypePTF.class) + .withTableArgument("input", DataTypes.of("ROW<value INT>")) + .build()) { + + harness.processElement(Row.of(7)); + + List<Row> output = harness.getOutput(); + assertThat(output).hasSize(1); + assertThat(output.get(0)).isEqualTo(Row.of(14)); + } + } + + // ------------------------------------------------------------------------- + // Partitioning Tests + // ------------------------------------------------------------------------- + + @Test + void testSetSemanticWithPartitionByName() throws Exception { + // Verify set-semantic table with partition configuration by column name + try (ProcessTableFunctionTestHarness<Row> harness = + ProcessTableFunctionTestHarness.ofClass(PartitionedPTF.class) + .withTableArgument("input", DataTypes.of("ROW<key STRING, value INT>")) + .withPartitionBy("input", "key") // Partition by "key" column name + .build()) { + + harness.processElement(Row.of("X", 10)); + harness.processElement(Row.of("Y", 20)); + harness.processElement(Row.of("X", 30)); + + List<Row> output = harness.getOutput(); + assertThat(output).hasSize(3); + assertThat(output.get(0)).isEqualTo(Row.of("X", 10)); + assertThat(output.get(1)).isEqualTo(Row.of("Y", 20)); + assertThat(output.get(2)).isEqualTo(Row.of("X", 30)); + } + } + + @Test + void testSetSemanticWithMultiplePartitionColumns() throws Exception { + // Verify composite partition key (multiple columns) + try (ProcessTableFunctionTestHarness<Row> harness = + ProcessTableFunctionTestHarness.ofClass(PartitionedPTF.class) + .withTableArgument( + "input", + DataTypes.of("ROW<region STRING, country STRING, value INT>")) + .withPartitionBy("input", "region", "country") + .build()) { + + harness.processElement(Row.of("EU", "DE", 100)); + harness.processElement(Row.of("EU", "DE", 200)); + harness.processElement(Row.of("EU", "FR", 300)); + harness.processElement(Row.of("US", "NY", 400)); + + List<Row> output = harness.getOutput(); + assertThat(output).hasSize(4); + assertThat(output.get(0)).isEqualTo(Row.of("EU", "DE", 100)); + assertThat(output.get(1)).isEqualTo(Row.of("EU", "DE", 200)); + assertThat(output.get(2)).isEqualTo(Row.of("EU", "FR", 300)); + assertThat(output.get(3)).isEqualTo(Row.of("US", "NY", 400)); + } + } + + @Test + void testSetSemanticWithSelectivePartitioning() throws Exception { + // Verify that only partition columns are automatically included in output + try (ProcessTableFunctionTestHarness<Row> harness = + ProcessTableFunctionTestHarness.ofClass(PartitionedPTF.class) + .withTableArgument( + "input", + DataTypes.of( + "ROW<id INT, region STRING, country STRING, city STRING, value INT>")) + .withPartitionBy("input", "region") + .build()) { + + harness.processElement(Row.of(1, "EU", "DE", "Berlin", 100)); + harness.processElement(Row.of(4, "US", "CA", "LA", 200)); + + List<Row> output = harness.getOutput(); + + assertThat(output.get(0)).isEqualTo(Row.of("EU", 100)); + assertThat(output.get(1)).isEqualTo(Row.of("US", 200)); + } + } + + @Test + void testMultipleSetSemanticTablesWithMatchingPartitionKeys() throws Exception { + try (ProcessTableFunctionTestHarness<Row> harness = + ProcessTableFunctionTestHarness.ofClass(MultiTableUnionPTF.class) + .withTableArgument("leftTable", DataTypes.of("ROW<name STRING, score INT>")) + .withPartitionBy("leftTable", "name") + .withTableArgument( + "rightTable", DataTypes.of("ROW<name STRING, city STRING>")) + .withPartitionBy("rightTable", "name") Review Comment: nice, thank you! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
