twalthr commented on code in PR #26375: URL: https://github.com/apache/flink/pull/26375#discussion_r2018926286
########## docs/content/docs/dev/table/functions/ptfs.md: ########## @@ -0,0 +1,1349 @@ +--- +title: "Process Table Functions" +weight: 52 +type: docs +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Process Table Functions (PTFs) + +Process Table Functions (PTFs) are the most powerful function kind for Flink SQL and Table API. They enable implementing +user-defined operators that can be as feature-rich as built-in operations. PTFs can take (partitioned) tables to produce +a new table. They have access to Flink's managed state, event-time and timer services, and underlying table changelogs. + +Conceptually, a PTF is itself a [user-defined function]({{< ref "docs/dev/table/functions/udfs" >}}) that is a superset of all +other user-defined functions. It maps zero, one, or multiple tables to zero, one, or multiple rows (or structured types). +Scalar arguments are supported. Due to its stateful nature, implementing aggregating behavior is possible as well. + +A PTF enables the following tasks: +- Apply transformations on each row of a table. +- Logically partition the table into distinct sets and apply transformations per set. +- Store seen events for repeated access. +- Continue the processing at a later point in time enabling waiting, synchronization, or timeouts. +- Buffer and aggregate events using complex state machines or rule-based conditional logic. + +{{< top >}} + +Polymorphic Table Functions +--------------------------- + +PTFs are not exclusive to Flink; their query syntax and semantics are derived from SQL:2016's *Polymorphic Table Functions*. +Detailed information on the expected behavior and integration of polymorphic table functions within the SQL language can +be found in [ISO/IEC 19075-7:2021 (Part 7)](https://www.iso.org/standard/78938.html). A publicly available +summary is provided in [Section 3](https://sigmodrecord.org/publications/sigmodRecord/1806/pdfs/08_Industry_Michels.pdf) +of the related SIGMOD paper. + +While both share the same abbreviation (*PTF*), process table functions in Flink enhance polymorphic table functions by +incorporating Flink-specific features, such as state management, time, and timer services. Call characteristics, including +table arguments with row or set semantics, descriptor arguments, and processing concepts related to virtual processors, +are aligned with the SQL standard. + +{{< top >}} + +Motivating Examples +------------------- + +The following examples demonstrate how a PTF can accept and transform tables. The `@ArgumentHint` specifies that the function +accepts a table as an argument, rather than just a scalar row value. In both examples, the `eval()` method is invoked for +each row in the input table. Additionally, the `@ArgumentHint` not only indicates that the function can process a table, +but also defines how the function interprets the table - whether using row or set semantics. + +### Greeting + +An example that adds a greeting to each incoming customer. + +{{< tabs "0137eeed-3d13-455c-8e2f-5e164da9f844" >}} +{{< tab "Java" >}} +```java +import org.apache.flink.table.annotation.*; +import org.apache.flink.table.api.*; +import org.apache.flink.table.functions.ProcessTableFunction; +import org.apache.flink.types.Row; +import static org.apache.flink.table.api.Expressions.*; + +// A PTF that takes a table argument, conceptually viewing the table as a row. +// The result is never stateful and derived purely based on the current row. +public static class Greeting extends ProcessTableFunction<String> { + public void eval(@ArgumentHint(ArgumentTrait.TABLE_AS_ROW) Row input) { + collect("Hello " + input.getFieldAs("name") + "!"); + } +} + +TableEnvironment env = TableEnvironment.create(...); + +// Call the PTF with row semantics "inline" (without registration) in Table API +env.fromValues("Bob", "Alice", "Bob") + .as("name") + .process(Greeting.class) + .execute() + .print(); + +// For SQL, register the PTF upfront +env.executeSql("CREATE VIEW Names(name) AS VALUES ('Bob'), ('Alice'), ('Bob')"); +env.createFunction("Greeting", Greeting.class); + +// Call the PTF with row semantics in SQL +env.executeSql("SELECT * FROM Greeting(TABLE Names)").print(); +``` +{{< /tab >}} +{{< /tabs >}} + +The results of both Table API and SQL look similar to: + +```text ++----+--------------------------------+ +| op | EXPR$0 | ++----+--------------------------------+ +| +I | Hello Bob! | +| +I | Hello Alice! | +| +I | Hello Bob! | ++----+--------------------------------+ +``` + +### Greeting with Memory + +An example that adds a greeting for each incoming customer, taking into account whether the customer has been greeted +previously. + +{{< tabs "0237eeed-3d13-455c-8e2f-5e164da9f844" >}} +{{< tab "Java" >}} +```java +import org.apache.flink.table.annotation.*; +import org.apache.flink.table.api.*; +import org.apache.flink.table.functions.ProcessTableFunction; +import org.apache.flink.types.Row; +import static org.apache.flink.table.api.Expressions.*; + +// A PTF that takes a table argument, conceptually viewing the table as a set. +// The result can be stateful and derived based on the current row and/or +// previous rows in the set. +// The call's partitioning defines the size of the set. +public static class GreetingWithMemory extends ProcessTableFunction<String> { Review Comment: Sure, we can do this as a follow up. Nevertheless people also love examples in docs just to get a feeling how it all fits together. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
