gyang94 commented on code in PR #1414:
URL: https://github.com/apache/fluss/pull/1414#discussion_r2249250777


##########
website/blog/2025-07-28-taotian-practice.md:
##########
@@ -0,0 +1,513 @@
+---
+slug: taotian-practice
+title: "Fluss Joins the Apache Incubator"
+sidebar_label: "The Implementation Practice Of Fluss On Taotian AB Test 
Analysis Platform : A Message Queue More Suitable for Real-Time OLAP"
+authors: [Zhang Xinyu, Wang Lilei]
+---
+
+
+**Introduction:** The Data Development Team of Taotian Group has built a new 
generation of real-time data warehouse based on Fluss, which solves the 
problems of redundant data consumption, difficult data profiling, and 
challenges in large State operation and maintenance. Fluss integrates columnar 
storage and real-time update capabilities, supports column pruning, KV point 
query, Delta Join, and lake-stream integration, significantly reducing IO and 
computing resource consumption, and improving job stability and data profiling 
efficiency. It has been implemented on the Taotian AB Test Platform, covering 
core businesses such as search and recommendation, and has been verified during 
the 618 Grand Promotion, achieving tens of millions of traffic, second latency, 
a 30% reduction in resource consumption, and a State reduction of over 100TB. 
In the future, it will continue to deepen lake-house architecture and expand AI 
scenario applications.
+
+1. # Business Background
+
+
+// TODO Image Placeholder
+
+
+Taotian AB Test Analysis Platform (hereinafter collectively referred to as the 
Babel Tower) mainly focuses on AB data of Taotian's C-end algorithms, aiming to 
promote scientific decision-making activities through the construction of 
generalized AB data capabilities. Since its inception in **2015** , it has 
continuously and effectively supported the analysis of Taotian's algorithm AB 
data for **10 years** . Currently, it is applied to **over 100** A/B testing 
scenarios across various business areas, including **search, recommendation, 
content,** **user growth****, and marketing**.
+
+
+// TODO Image Placeholder
+
+
+The Babel Tower provides the following capabilities:
+
+- **AB Data Public Data Warehouse:** Serves various data applications of 
downstream algorithms, including: online traffic splitting, distribution 
alignment, general features, scenario labels, and other application scenarios.
+
+- **Scientific Experiment Evaluation:** Implement mature scientific evaluation 
solutions in the industry, conduct long-term tracking of the effects of AB 
tests, and help businesses obtain real and reliable experimental evaluation 
results.
+
+- **Multidimensional Ad Hoc OLAP Self-Service Analysis:** Through mature data 
solutions, it supports multidimensional and ad hoc OLAP queries, serving data 
effectiveness analysis across all clients, businesses, and scenarios.
+
+
+# 2. Business Pain Points
+
+Currently, the real-time data warehouse of the Babel Tower is based on 
technology stacks such as Flink, message queue, OLAP engine, etc., where the 
message queue is TT (Kafka-like architecture MQ) within the Taotian group, and 
the OLAP engine is Alibaba Cloud Hologres.
+
+
+// TODO Image Placeholder
+
+
+After consuming the message queue data collected from access logs, we perform 
business logic processing in Flink. However, when SQL is complex, especially 
when there are OrderBy and Join operations, it will cause the retraction stream 
processed by Flink to double, resulting in a very large Flink state and 
consuming a large amount of computing resources. This scenario may pose 
significant challenges in terms of job development and maintenance. The 
development cycle of jobs is also much longer compared to offline solutions. 
Currently, the message queue still has some limitations, and the main problems 
encountered are as follows:
+
+## 2.1 Data Consumption Redundancy
+
+In a data warehouse, write-once, read-many is a common operation mode, and 
each consumer typically only consumes a portion of the data. For example, in an 
exposure job of the Babel Tower, the consumed message queue provides 44 fields, 
but we only need 13 of them. Since the message queue is row-based storage, when 
consuming, we still need to read the data of all columns. This means that 70% 
of the IO for consumption has to bear 100% of the cost. This situation leads to 
a significant waste of network resources.
+
+In Flink, we attempted to explicitly specify the consumption column schema in 
the Source and introduce a column pruning UDF operator to reduce the 
consumption cost of the data source, but in practice, the results were minimal 
and the data pipeline complexity was relatively high.
+
+// TODO Image Placeholder
+
+## 2.2 Data Profiling Difficulties
+
+### 2.2.1 MQ does not support Data Point Query
+
+In data warehouse construction, data profiling is a basic business 
requirement, used for problem location, case troubleshooting, etc. Two data 
profiling methods for message queues have been explored in production practice, 
both of which have advantages and disadvantages and cannot comprehensively meet 
business requirements.
+
+// TODO Image Placeholder
+
+// TODO Image Placeholder
+
+
+### 2.2.2 Flink State data is not visible
+
+In e-commerce scenarios, counting the user's first and last channels on the 
same day is a key indicator for measuring user acquisition effectiveness and 
channel quality. To ensure the accuracy of the calculation results, the 
computing engine must perform sorting and deduplication operations, 
materializing all upstream data through Flink State. However, State is an 
internal Black box that we cannot see or touch, making it extremely difficult 
to modify jobs or troubleshoot issues.
+
+
+// TODO Image Placeholder
+
+
+## 2.3 Difficulties in Maintenance of Large State Jobs
+
+In e-commerce scenarios, order attribution is the core business logic. In the 
real-time data warehouse, it often requires the use of Dual Stream Join (click 
stream, transaction stream) + Flink State (24H) to implement the business logic 
of same-day transaction attribution. The resulting problem is the extremely 
large Flink State (for a single job **up to 100TB),** which includes operations 
such as sorting and Dual Stream Join.
+
+
+// TODO Image Placeholder
+
+
+During a Flink job, requires State to maintain intermediate result sets. When 
the execution plan verification succeeds after modifying the job, it starts 
from the latest state. As long as the execution plan verification fails, it has 
to **reinitialize State from 0 clock** , which is time-consuming and 
labor-intensive. Moreover, each time data is consumed, both Sort State and Join 
State are updated. Among them, the Sort State of the sorting operator can reach 
**90TB** , and the Join State can be as high as **10TB** . The huge state 
brings many problems, including high cost, poor job stability, Checkpoint 
timeout, slow restart and recovery, etc.
+
+
+// TODO Image Placeholder
+
+
+# 3. Fluss Core Competencies
+
+## 3.1 What is Fluss?
+
+> Fluss Official Documentation: 
[https://fluss.apache.org/](https://fluss.apache.org/)
+
+> GitHub: [https://github.com/apache/fluss](https://github.com/apache/fluss)
+
+
+
+// TODO Image Placeholder
+
+
+Fluss, developed by the Flink team, is the next-generation stream storage for 
stream analysis, a stream storage built for real-time analysis. Fluss 
innovatively integrates columnar storage format and real-time update 
capabilities into stream storage, and is deeply integrated with Flink to help 
users build a streaming data warehouse with high throughput, low latency, and 
low cost. It has the following core features:
+
+- Real-time read and write: Supports millisecond-level streaming read and 
write capabilities.
+
+- Columnar pruning: Store real-time stream data in columnar format. Column 
pruning can improve read performance by 10 times and reduce network costs.
+
+- Streaming Update: Supports real-time streaming updates for large-scale data. 
Supports partial column updates to achieve low-cost wide table stitching.
+
+- CDC Subscription: Updates will generate a complete Change Log (CDC), and by 
consuming CDC through Flink streaming, real-time data flow across the 
whole-pipeline of the data warehouse can be achieved.
+
+- Real-time point query: Supports high-performance primary key point query and 
can be used as a dimension table association for real-time processing links.
+
+- Unified Lake and Stream: Seamlessly integrates Lakehouse and provides a 
real-time data layer for Lakehouse. This not only brings low-latency data to 
Lakehouse analytics but also endows stream storage with powerful analytical 
capabilities.
+
+
+## 3.2 Table Type
+
+
+
+// TODO Image Placeholder
+
+
+**Type:** Divided into log tables and primary key tables. Log tables are 
columnar MQs that only support insert operations, while primary key tables can 
be updated according to the primary key and specified merge engine.
+
+**Partition:** Divides data into smaller, more manageable subsets according to 
specified columns. Fluss supports more diverse partitioning strategies, such as 
Dynamic create partitions. For the latest documentation on partitioning, please 
refer to: 
[https://fluss.apache.org/docs/table-design/data-distribution/partitioning/](https://fluss.apache.org/docs/table-design/data-distribution/partitioning/)
 . Note that the partition type must be of String type and can be defined via 
the following SQL:
+
+```SQL
+CREATE TABLE temp(
+  dt STRING
+) PARTITIONED BY (dt)
+WITH (
+  'table.auto-partition.num-precreate' = '2' -- Create 2 partitions in advance
+  ,'table.auto-partition.num-retention' = '2' -- Keep the first 2 partitions
+  ,'table.auto-partition.enabled' = 'true' -- Automatic partitioning
+);
+```
+
+**Bucket:** The smallest unit of read and write operations. For a primary key 
table, the bucket to which each piece of data belongs is determined based on 
the hash value of the primary key of each piece of data. For a log table, the 
configuration of column hashing can be specified in the with parameter when 
creating the table; otherwise, it will be randomly scattered.
+
+### 3.2.1 Log Table
+
+The log table is a commonly used table in Fluss, which writes data in the 
order of writing, only supports insert operations, and does not support 
update/delete operations, similar to MQ systems such as Kafka and TT. For the 
log tables, currently most of the data will be uploaded to a remote location, 
and only a portion of the data will be stored locally. For example, a log table 
has 128 buckets, only 128 \* 2 (number of segments retained locally) \* 1 (size 
of one segment) \* 3 (number of replicas) GB = 768 GB will be stored locally. 
The table creation statement is as follows:
+
+```SQL
+CREATE TABLE `temp` (
+  second_timestamp  STRING
+  ,pk               STRING
+  ,assist           STRING
+  ,user_name        STRING
+)
+with (
+  'bucket.num' = '256' -- Number of buckets
+  ,'table.log.ttl' = '2d' -- TTL setting, default 7 days
+  ,'table.log.arrow.compression.type' = 'ZSTD' -- Compression mode, currently 
added by default
+  ,'client.writer.bucket.no-key-assigner' = 'sticky' -- Bucket mode
+);
+```
+
+In Fluss, the log table is stored in Apache Arrow columnar format by default. 
This format stores data column by column rather than row by row, thereby 
enabling **column pruning**. This ensures that only the required columns are 
consumed during real-time consumption, thereby reducing IO overhead, improving 
performance, and reducing resource usage.
+
+### 3.2.2 PrimaryKey Table
+
+Compared to the log table, the primary key table supports insert, update, and 
delete operations, and different merge methods are implemented by specifying 
the Merge Engine. It should be noted that `bucket.key` and `partitioned.key` 
need to be subsets of `primary.key`, and if this KV table is used for 
DeltaJoin, `bucket.key` needs to be a prefix of `primary.key`. The table 
creation statement is as follows:
+
+```SQL
+CREATE TABLE temp (
+  ,pk               STRING
+  ,user_name        STRING
+  ,item_id          STRING
+  ,event_time       BIGINT
+  ,dt               STRING
+  ,PRIMARY KEY (user_name,item_id,pk,dt) NOT ENFORCED
+) PARTITIONED BY (dt)
+WITH (
+  'bucket.num' = '512'
+  ,'bucket.key' = 'user_name,item_id'
+  ,'table.auto-partition.enabled' = 'true'
+  ,'table.auto-partition.time-unit' = 'day'
+  ,'table.log.ttl' = '1d' -- binlog保留1天

Review Comment:
   characters in code. :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to