gyang94 commented on code in PR #1414: URL: https://github.com/apache/fluss/pull/1414#discussion_r2249250777
########## website/blog/2025-07-28-taotian-practice.md: ########## @@ -0,0 +1,513 @@ +--- +slug: taotian-practice +title: "Fluss Joins the Apache Incubator" +sidebar_label: "The Implementation Practice Of Fluss On Taotian AB Test Analysis Platform : A Message Queue More Suitable for Real-Time OLAP" +authors: [Zhang Xinyu, Wang Lilei] +--- + + +**Introduction:** The Data Development Team of Taotian Group has built a new generation of real-time data warehouse based on Fluss, which solves the problems of redundant data consumption, difficult data profiling, and challenges in large State operation and maintenance. Fluss integrates columnar storage and real-time update capabilities, supports column pruning, KV point query, Delta Join, and lake-stream integration, significantly reducing IO and computing resource consumption, and improving job stability and data profiling efficiency. It has been implemented on the Taotian AB Test Platform, covering core businesses such as search and recommendation, and has been verified during the 618 Grand Promotion, achieving tens of millions of traffic, second latency, a 30% reduction in resource consumption, and a State reduction of over 100TB. In the future, it will continue to deepen lake-house architecture and expand AI scenario applications. + +1. # Business Background + + +// TODO Image Placeholder + + +Taotian AB Test Analysis Platform (hereinafter collectively referred to as the Babel Tower) mainly focuses on AB data of Taotian's C-end algorithms, aiming to promote scientific decision-making activities through the construction of generalized AB data capabilities. Since its inception in **2015** , it has continuously and effectively supported the analysis of Taotian's algorithm AB data for **10 years** . Currently, it is applied to **over 100** A/B testing scenarios across various business areas, including **search, recommendation, content,** **user growth****, and marketing**. + + +// TODO Image Placeholder + + +The Babel Tower provides the following capabilities: + +- **AB Data Public Data Warehouse:** Serves various data applications of downstream algorithms, including: online traffic splitting, distribution alignment, general features, scenario labels, and other application scenarios. + +- **Scientific Experiment Evaluation:** Implement mature scientific evaluation solutions in the industry, conduct long-term tracking of the effects of AB tests, and help businesses obtain real and reliable experimental evaluation results. + +- **Multidimensional Ad Hoc OLAP Self-Service Analysis:** Through mature data solutions, it supports multidimensional and ad hoc OLAP queries, serving data effectiveness analysis across all clients, businesses, and scenarios. + + +# 2. Business Pain Points + +Currently, the real-time data warehouse of the Babel Tower is based on technology stacks such as Flink, message queue, OLAP engine, etc., where the message queue is TT (Kafka-like architecture MQ) within the Taotian group, and the OLAP engine is Alibaba Cloud Hologres. + + +// TODO Image Placeholder + + +After consuming the message queue data collected from access logs, we perform business logic processing in Flink. However, when SQL is complex, especially when there are OrderBy and Join operations, it will cause the retraction stream processed by Flink to double, resulting in a very large Flink state and consuming a large amount of computing resources. This scenario may pose significant challenges in terms of job development and maintenance. The development cycle of jobs is also much longer compared to offline solutions. Currently, the message queue still has some limitations, and the main problems encountered are as follows: + +## 2.1 Data Consumption Redundancy + +In a data warehouse, write-once, read-many is a common operation mode, and each consumer typically only consumes a portion of the data. For example, in an exposure job of the Babel Tower, the consumed message queue provides 44 fields, but we only need 13 of them. Since the message queue is row-based storage, when consuming, we still need to read the data of all columns. This means that 70% of the IO for consumption has to bear 100% of the cost. This situation leads to a significant waste of network resources. + +In Flink, we attempted to explicitly specify the consumption column schema in the Source and introduce a column pruning UDF operator to reduce the consumption cost of the data source, but in practice, the results were minimal and the data pipeline complexity was relatively high. + +// TODO Image Placeholder + +## 2.2 Data Profiling Difficulties + +### 2.2.1 MQ does not support Data Point Query + +In data warehouse construction, data profiling is a basic business requirement, used for problem location, case troubleshooting, etc. Two data profiling methods for message queues have been explored in production practice, both of which have advantages and disadvantages and cannot comprehensively meet business requirements. + +// TODO Image Placeholder + +// TODO Image Placeholder + + +### 2.2.2 Flink State data is not visible + +In e-commerce scenarios, counting the user's first and last channels on the same day is a key indicator for measuring user acquisition effectiveness and channel quality. To ensure the accuracy of the calculation results, the computing engine must perform sorting and deduplication operations, materializing all upstream data through Flink State. However, State is an internal Black box that we cannot see or touch, making it extremely difficult to modify jobs or troubleshoot issues. + + +// TODO Image Placeholder + + +## 2.3 Difficulties in Maintenance of Large State Jobs + +In e-commerce scenarios, order attribution is the core business logic. In the real-time data warehouse, it often requires the use of Dual Stream Join (click stream, transaction stream) + Flink State (24H) to implement the business logic of same-day transaction attribution. The resulting problem is the extremely large Flink State (for a single job **up to 100TB),** which includes operations such as sorting and Dual Stream Join. + + +// TODO Image Placeholder + + +During a Flink job, requires State to maintain intermediate result sets. When the execution plan verification succeeds after modifying the job, it starts from the latest state. As long as the execution plan verification fails, it has to **reinitialize State from 0 clock** , which is time-consuming and labor-intensive. Moreover, each time data is consumed, both Sort State and Join State are updated. Among them, the Sort State of the sorting operator can reach **90TB** , and the Join State can be as high as **10TB** . The huge state brings many problems, including high cost, poor job stability, Checkpoint timeout, slow restart and recovery, etc. + + +// TODO Image Placeholder + + +# 3. Fluss Core Competencies + +## 3.1 What is Fluss? + +> Fluss Official Documentation: [https://fluss.apache.org/](https://fluss.apache.org/) + +> GitHub: [https://github.com/apache/fluss](https://github.com/apache/fluss) + + + +// TODO Image Placeholder + + +Fluss, developed by the Flink team, is the next-generation stream storage for stream analysis, a stream storage built for real-time analysis. Fluss innovatively integrates columnar storage format and real-time update capabilities into stream storage, and is deeply integrated with Flink to help users build a streaming data warehouse with high throughput, low latency, and low cost. It has the following core features: + +- Real-time read and write: Supports millisecond-level streaming read and write capabilities. + +- Columnar pruning: Store real-time stream data in columnar format. Column pruning can improve read performance by 10 times and reduce network costs. + +- Streaming Update: Supports real-time streaming updates for large-scale data. Supports partial column updates to achieve low-cost wide table stitching. + +- CDC Subscription: Updates will generate a complete Change Log (CDC), and by consuming CDC through Flink streaming, real-time data flow across the whole-pipeline of the data warehouse can be achieved. + +- Real-time point query: Supports high-performance primary key point query and can be used as a dimension table association for real-time processing links. + +- Unified Lake and Stream: Seamlessly integrates Lakehouse and provides a real-time data layer for Lakehouse. This not only brings low-latency data to Lakehouse analytics but also endows stream storage with powerful analytical capabilities. + + +## 3.2 Table Type + + + +// TODO Image Placeholder + + +**Type:** Divided into log tables and primary key tables. Log tables are columnar MQs that only support insert operations, while primary key tables can be updated according to the primary key and specified merge engine. + +**Partition:** Divides data into smaller, more manageable subsets according to specified columns. Fluss supports more diverse partitioning strategies, such as Dynamic create partitions. For the latest documentation on partitioning, please refer to: [https://fluss.apache.org/docs/table-design/data-distribution/partitioning/](https://fluss.apache.org/docs/table-design/data-distribution/partitioning/) . Note that the partition type must be of String type and can be defined via the following SQL: + +```SQL +CREATE TABLE temp( + dt STRING +) PARTITIONED BY (dt) +WITH ( + 'table.auto-partition.num-precreate' = '2' -- Create 2 partitions in advance + ,'table.auto-partition.num-retention' = '2' -- Keep the first 2 partitions + ,'table.auto-partition.enabled' = 'true' -- Automatic partitioning +); +``` + +**Bucket:** The smallest unit of read and write operations. For a primary key table, the bucket to which each piece of data belongs is determined based on the hash value of the primary key of each piece of data. For a log table, the configuration of column hashing can be specified in the with parameter when creating the table; otherwise, it will be randomly scattered. + +### 3.2.1 Log Table + +The log table is a commonly used table in Fluss, which writes data in the order of writing, only supports insert operations, and does not support update/delete operations, similar to MQ systems such as Kafka and TT. For the log tables, currently most of the data will be uploaded to a remote location, and only a portion of the data will be stored locally. For example, a log table has 128 buckets, only 128 \* 2 (number of segments retained locally) \* 1 (size of one segment) \* 3 (number of replicas) GB = 768 GB will be stored locally. The table creation statement is as follows: + +```SQL +CREATE TABLE `temp` ( + second_timestamp STRING + ,pk STRING + ,assist STRING + ,user_name STRING +) +with ( + 'bucket.num' = '256' -- Number of buckets + ,'table.log.ttl' = '2d' -- TTL setting, default 7 days + ,'table.log.arrow.compression.type' = 'ZSTD' -- Compression mode, currently added by default + ,'client.writer.bucket.no-key-assigner' = 'sticky' -- Bucket mode +); +``` + +In Fluss, the log table is stored in Apache Arrow columnar format by default. This format stores data column by column rather than row by row, thereby enabling **column pruning**. This ensures that only the required columns are consumed during real-time consumption, thereby reducing IO overhead, improving performance, and reducing resource usage. + +### 3.2.2 PrimaryKey Table + +Compared to the log table, the primary key table supports insert, update, and delete operations, and different merge methods are implemented by specifying the Merge Engine. It should be noted that `bucket.key` and `partitioned.key` need to be subsets of `primary.key`, and if this KV table is used for DeltaJoin, `bucket.key` needs to be a prefix of `primary.key`. The table creation statement is as follows: + +```SQL +CREATE TABLE temp ( + ,pk STRING + ,user_name STRING + ,item_id STRING + ,event_time BIGINT + ,dt STRING + ,PRIMARY KEY (user_name,item_id,pk,dt) NOT ENFORCED +) PARTITIONED BY (dt) +WITH ( + 'bucket.num' = '512' + ,'bucket.key' = 'user_name,item_id' + ,'table.auto-partition.enabled' = 'true' + ,'table.auto-partition.time-unit' = 'day' + ,'table.log.ttl' = '1d' -- binlog保留1天 Review Comment: characters in code. :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
