xx789633 commented on code in PR #2669:
URL: https://github.com/apache/fluss/pull/2669#discussion_r2805064900


##########
website/docs/quickstart/User-Profile.md:
##########
@@ -0,0 +1,279 @@
+---
+title: Real-Time User Profile
+sidebar_position: 4
+---
+
+# Real-Time User Profile
+
+This tutorial demonstrates how to build a production-grade, real-time user 
profiling system using Apache Fluss. You will learn how to map high-cardinality 
string identifiers (like emails) to compact integers and aggregate user 
behavior directly in the storage layer with exactly-once guarantees.
+
+![arch](/img/user-profile.png)
+
+## How the System Works
+
+### Core Concepts
+*   **Identity Mapping**: High-cardinality strings (Emails) ➔ Compact 32-bit 
`INT` UIDs.
+*   **Offloaded Aggregation**: Computation happens in Fluss TabletServers, 
keeping Flink state-free.
+*   **Optimized Storage**: Native [RoaringBitmap](https://roaringbitmap.org/) 
support for sub-second unique visitor (UV) counts.
+
+### Data Flow
+1.  **Ingestion**: Raw event streams (e.g., clicks, page views) enter the 
system from a source like [Apache Kafka](https://kafka.apache.org/).
+2.  **Mapping**: The [Apache Flink](https://flink.apache.org/) job performs a 
temporal lookup join against the `user_dict` table. If a user is new, the 
`insert-if-not-exists` hint triggers Fluss to automatically generate a unique 
`INT` UID using its Auto-Increment feature.
+3.  **Merge**: The **Aggregation Merge Engine** updates clicks and bitmaps in 
the storage layer.
+4.  **Recovery**: The **Undo Recovery** mechanism ensures exactly-once 
accuracy during failovers.
+
+## Environment Setup
+
+### Prerequisites
+
+Before proceeding, ensure that 
[Docker](https://docs.docker.com/engine/install/) and the [Docker Compose 
plugin](https://docs.docker.com/compose/install/linux/) are installed.
+
+### Starting the Playground
+
+1. Create a working directory.
+   ```shell
+   mkdir fluss-user-profile
+   cd fluss-user-profile
+   ```
+
+2. Set the Fluss version environment variable.
+   ```shell
+   # Set to 0.9.0 or 0.10-SNAPSHOT for latest features
+   export FLUSS_DOCKER_VERSION=0.10-SNAPSHOT
+   export FLINK_VERSION="1.20"
+   ```
+   :::note
+   If you open a new terminal window, remember to re-run these export commands.
+   :::
+
+3. Create a `lib` directory and download the required JARs.
+   ```shell
+   mkdir lib
+   # Download Flink Faker for data generation
+   curl -fL -o lib/flink-faker-0.5.3.jar 
https://github.com/knaufk/flink-faker/releases/download/v0.5.3/flink-faker-0.5.3.jar
+   # Download Fluss Connector
+   curl -fL -o "lib/fluss-flink-${FLINK_VERSION}-${FLUSS_DOCKER_VERSION}.jar" \
+   
"https://repo1.maven.org/maven2/org/apache/fluss/fluss-flink-${FLINK_VERSION}/${FLUSS_DOCKER_VERSION}/fluss-flink-${FLINK_VERSION}-${FLUSS_DOCKER_VERSION}.jar";
+   ```
+   :::note
+   For now : cp 
../fluss-flink/fluss-flink-1.20/target/fluss-flink-1.20-0.10-SNAPSHOT.jar lib/
+   :::
+
+4. Create a `docker-compose.yml` file.
+   ```yaml
+   services:
+     coordinator-server:
+       image: apache/fluss:${FLUSS_DOCKER_VERSION}
+       command: coordinatorServer
+       depends_on:
+         - zookeeper
+       environment:
+         - |
+           FLUSS_PROPERTIES=
+           zookeeper.address: zookeeper:2181
+           bind.listeners: FLUSS://coordinator-server:9123
+           remote.data.dir: /remote-data
+       volumes:
+         - fluss-remote-data:/remote-data
+     tablet-server:
+       image: apache/fluss:${FLUSS_DOCKER_VERSION}
+       command: tabletServer
+       depends_on:
+         - coordinator-server
+       environment:
+         - |
+           FLUSS_PROPERTIES=
+           zookeeper.address: zookeeper:2181
+           bind.listeners: FLUSS://tablet-server:9123
+           data.dir: /tmp/fluss/data
+           remote.data.dir: /remote-data
+       volumes:
+         - fluss-remote-data:/remote-data
+     zookeeper:
+       restart: always
+       image: zookeeper:3.9.2
+     jobmanager:
+       image: flink:${FLINK_VERSION}
+       ports:
+         - "8081:8081"
+       environment:
+         - |
+           FLINK_PROPERTIES=
+           jobmanager.rpc.address: jobmanager
+       entrypoint: ["sh", "-c", "cp -v /tmp/lib/*.jar /opt/flink/lib && exec 
/docker-entrypoint.sh jobmanager"]
+       volumes:
+         - ./lib:/tmp/lib
+         - fluss-remote-data:/remote-data
+     taskmanager:
+       image: flink:${FLINK_VERSION}
+       depends_on:
+         - jobmanager
+       environment:
+         - |
+           FLINK_PROPERTIES=
+           jobmanager.rpc.address: jobmanager
+           taskmanager.numberOfTaskSlots: 2
+       entrypoint: ["sh", "-c", "cp -v /tmp/lib/*.jar /opt/flink/lib && exec 
/docker-entrypoint.sh taskmanager"]
+       volumes:
+         - ./lib:/tmp/lib
+         - fluss-remote-data:/remote-data
+     sql-client:
+       image: flink:${FLINK_VERSION}
+       depends_on:
+         - jobmanager
+       environment:
+         - |
+           FLINK_PROPERTIES=
+           jobmanager.rpc.address: jobmanager
+           rest.address: jobmanager
+       entrypoint: ["sh", "-c", "cp -v /tmp/lib/*.jar /opt/flink/lib && exec 
/docker-entrypoint.sh bin/sql-client.sh"]
+       volumes:
+         - ./lib:/tmp/lib
+         - fluss-remote-data:/remote-data
+
+   volumes:
+     fluss-remote-data:
+   ```
+
+5. Start the environment.
+   ```shell
+   docker compose up -d
+   ```
+
+6. Launch the Flink SQL Client.
+   ```shell
+   # Explicitly include the Fluss JAR in the classpath for the session
+   docker compose run sql-client bin/sql-client.sh -j 
/tmp/lib/fluss-flink-${FLINK_VERSION}-${FLUSS_DOCKER_VERSION}.jar
+   ```
+
+## Step 1: Create the Fluss Catalog
+
+In the SQL Client, create and use the Fluss catalog.
+
+:::tip
+Run SQL statements one by one to avoid errors.
+:::
+
+```sql
+CREATE CATALOG fluss_catalog WITH (
+    'type' = 'fluss',
+    'bootstrap.servers' = 'coordinator-server:9123'
+);
+```
+
+```sql
+USE CATALOG fluss_catalog;
+```
+
+## Step 2: Create the User Dictionary Table
+
+Create the `user_dict` table to map emails to UIDs. We use 
`auto-increment.fields` to automatically generate unique IDs.
+
+```sql
+CREATE TABLE user_dict (
+    email STRING,
+    uid INT,
+    PRIMARY KEY (email) NOT ENFORCED
+) WITH (
+    'connector' = 'fluss',
+    'auto-increment.fields' = 'uid',
+    'bucket.num' = '1'
+);
+```
+
+## Step 3: Create the Aggregated Profile Table
+
+Create the `user_profiles` table using the **Aggregation Merge Engine**. This 
pushes the aggregation logic (summing clicks, unioning bitmaps) to the Fluss 
TabletServers.
+
+:::tip
+We use `rbm64` ([RoaringBitmap](https://roaringbitmap.org/)) for efficient 
unique visitor counting. This allows you to store millions of unique IDs in a 
highly compressed format.
+:::
+
+```sql
+CREATE TABLE user_profiles (
+    profile_id INT,
+    unique_visitors BYTES,
+    total_clicks BIGINT,
+    PRIMARY KEY (profile_id) NOT ENFORCED
+) WITH (
+    'connector' = 'fluss',
+    'table.merge-engine' = 'aggregation',
+    'fields.unique_visitors.agg' = 'rbm64',
+    'fields.total_clicks.agg' = 'sum',
+    'bucket.num' = '1'
+);
+```
+
+## Step 4: Ingest and Process Data
+
+Create a temporary source table to simulate raw user events using the Faker 
connector.
+
+```sql
+CREATE TEMPORARY TABLE raw_events (
+    email STRING,
+    click_count INT,
+    profile_group_id INT,
+    proctime AS PROCTIME()
+) WITH (
+    'connector' = 'faker',
+    'rows-per-second' = '1',
+    'fields.email.expression' = '#{internet.emailAddress}',
+    'fields.click_count.expression' = '#{number.numberBetween ''1'',''10''}',
+    'fields.profile_group_id.expression' = '#{number.numberBetween 
''1'',''5''}'
+);
+```
+
+Now, run the pipeline. The `lookup.insert-if-not-exists` hint ensures that if 
an email is not found in `user_dict`, Fluss generates a new `uid` on the fly.
+
+```sql
+INSERT INTO user_profiles
+SELECT
+    d.uid,
+    -- Convert INT to BYTES for rbm64. 
+    -- Note: In a real production job, you might use a UDF to ensure correct 
bitmap initialization.
+    CAST(CAST(d.uid AS STRING) AS BYTES),

Review Comment:
   > I believe we need the `to_rbm` and `from_rbm` Flink UDFs to process the 
data correctly. Without these functions, the results would be meaningless and 
users would not understand the purpose of this feature.
   
   Exactly. we need such functions as RB_CARDINALITY and RB_OR_AGG for 
aggregating the result bitmap.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to