wuchong commented on code in PR #2669:
URL: https://github.com/apache/fluss/pull/2669#discussion_r2805038770


##########
website/docs/quickstart/User-Profile.md:
##########
@@ -0,0 +1,279 @@
+---
+title: Real-Time User Profile
+sidebar_position: 4
+---
+
+# Real-Time User Profile
+
+This tutorial demonstrates how to build a production-grade, real-time user 
profiling system using Apache Fluss. You will learn how to map high-cardinality 
string identifiers (like emails) to compact integers and aggregate user 
behavior directly in the storage layer with exactly-once guarantees.
+
+![arch](/img/user-profile.png)
+
+## How the System Works
+
+### Core Concepts
+*   **Identity Mapping**: High-cardinality strings (Emails) ➔ Compact 32-bit 
`INT` UIDs.
+*   **Offloaded Aggregation**: Computation happens in Fluss TabletServers, 
keeping Flink state-free.
+*   **Optimized Storage**: Native [RoaringBitmap](https://roaringbitmap.org/) 
support for sub-second unique visitor (UV) counts.
+
+### Data Flow
+1.  **Ingestion**: Raw event streams (e.g., clicks, page views) enter the 
system from a source like [Apache Kafka](https://kafka.apache.org/).
+2.  **Mapping**: The [Apache Flink](https://flink.apache.org/) job performs a 
temporal lookup join against the `user_dict` table. If a user is new, the 
`insert-if-not-exists` hint triggers Fluss to automatically generate a unique 
`INT` UID using its Auto-Increment feature.
+3.  **Merge**: The **Aggregation Merge Engine** updates clicks and bitmaps in 
the storage layer.
+4.  **Recovery**: The **Undo Recovery** mechanism ensures exactly-once 
accuracy during failovers.
+
+## Environment Setup
+
+### Prerequisites
+
+Before proceeding, ensure that 
[Docker](https://docs.docker.com/engine/install/) and the [Docker Compose 
plugin](https://docs.docker.com/compose/install/linux/) are installed.
+
+### Starting the Playground
+
+1. Create a working directory.
+   ```shell
+   mkdir fluss-user-profile
+   cd fluss-user-profile
+   ```
+
+2. Set the Fluss version environment variable.
+   ```shell
+   # Set to 0.9.0 or 0.10-SNAPSHOT for latest features
+   export FLUSS_DOCKER_VERSION=0.10-SNAPSHOT
+   export FLINK_VERSION="1.20"
+   ```
+   :::note
+   If you open a new terminal window, remember to re-run these export commands.
+   :::
+
+3. Create a `lib` directory and download the required JARs.
+   ```shell
+   mkdir lib
+   # Download Flink Faker for data generation
+   curl -fL -o lib/flink-faker-0.5.3.jar 
https://github.com/knaufk/flink-faker/releases/download/v0.5.3/flink-faker-0.5.3.jar
+   # Download Fluss Connector
+   curl -fL -o "lib/fluss-flink-${FLINK_VERSION}-${FLUSS_DOCKER_VERSION}.jar" \
+   
"https://repo1.maven.org/maven2/org/apache/fluss/fluss-flink-${FLINK_VERSION}/${FLUSS_DOCKER_VERSION}/fluss-flink-${FLINK_VERSION}-${FLUSS_DOCKER_VERSION}.jar";
+   ```
+   :::note
+   For now : cp 
../fluss-flink/fluss-flink-1.20/target/fluss-flink-1.20-0.10-SNAPSHOT.jar lib/
+   :::
+
+4. Create a `docker-compose.yml` file.
+   ```yaml
+   services:
+     coordinator-server:
+       image: apache/fluss:${FLUSS_DOCKER_VERSION}
+       command: coordinatorServer
+       depends_on:
+         - zookeeper
+       environment:
+         - |
+           FLUSS_PROPERTIES=
+           zookeeper.address: zookeeper:2181
+           bind.listeners: FLUSS://coordinator-server:9123
+           remote.data.dir: /remote-data
+       volumes:
+         - fluss-remote-data:/remote-data
+     tablet-server:
+       image: apache/fluss:${FLUSS_DOCKER_VERSION}
+       command: tabletServer
+       depends_on:
+         - coordinator-server
+       environment:
+         - |
+           FLUSS_PROPERTIES=
+           zookeeper.address: zookeeper:2181
+           bind.listeners: FLUSS://tablet-server:9123
+           data.dir: /tmp/fluss/data
+           remote.data.dir: /remote-data
+       volumes:
+         - fluss-remote-data:/remote-data
+     zookeeper:
+       restart: always
+       image: zookeeper:3.9.2
+     jobmanager:
+       image: flink:${FLINK_VERSION}
+       ports:
+         - "8081:8081"
+       environment:
+         - |
+           FLINK_PROPERTIES=
+           jobmanager.rpc.address: jobmanager
+       entrypoint: ["sh", "-c", "cp -v /tmp/lib/*.jar /opt/flink/lib && exec 
/docker-entrypoint.sh jobmanager"]
+       volumes:
+         - ./lib:/tmp/lib
+         - fluss-remote-data:/remote-data
+     taskmanager:
+       image: flink:${FLINK_VERSION}
+       depends_on:
+         - jobmanager
+       environment:
+         - |
+           FLINK_PROPERTIES=
+           jobmanager.rpc.address: jobmanager
+           taskmanager.numberOfTaskSlots: 2
+       entrypoint: ["sh", "-c", "cp -v /tmp/lib/*.jar /opt/flink/lib && exec 
/docker-entrypoint.sh taskmanager"]
+       volumes:
+         - ./lib:/tmp/lib
+         - fluss-remote-data:/remote-data
+     sql-client:
+       image: flink:${FLINK_VERSION}
+       depends_on:
+         - jobmanager
+       environment:
+         - |
+           FLINK_PROPERTIES=
+           jobmanager.rpc.address: jobmanager
+           rest.address: jobmanager
+       entrypoint: ["sh", "-c", "cp -v /tmp/lib/*.jar /opt/flink/lib && exec 
/docker-entrypoint.sh bin/sql-client.sh"]
+       volumes:
+         - ./lib:/tmp/lib
+         - fluss-remote-data:/remote-data
+
+   volumes:
+     fluss-remote-data:
+   ```
+
+5. Start the environment.
+   ```shell
+   docker compose up -d
+   ```
+
+6. Launch the Flink SQL Client.
+   ```shell
+   # Explicitly include the Fluss JAR in the classpath for the session
+   docker compose run sql-client bin/sql-client.sh -j 
/tmp/lib/fluss-flink-${FLINK_VERSION}-${FLUSS_DOCKER_VERSION}.jar
+   ```
+
+## Step 1: Create the Fluss Catalog
+
+In the SQL Client, create and use the Fluss catalog.
+
+:::tip
+Run SQL statements one by one to avoid errors.
+:::
+
+```sql
+CREATE CATALOG fluss_catalog WITH (
+    'type' = 'fluss',
+    'bootstrap.servers' = 'coordinator-server:9123'
+);
+```
+
+```sql
+USE CATALOG fluss_catalog;
+```
+
+## Step 2: Create the User Dictionary Table
+
+Create the `user_dict` table to map emails to UIDs. We use 
`auto-increment.fields` to automatically generate unique IDs.
+
+```sql
+CREATE TABLE user_dict (
+    email STRING,
+    uid INT,
+    PRIMARY KEY (email) NOT ENFORCED
+) WITH (
+    'connector' = 'fluss',
+    'auto-increment.fields' = 'uid',
+    'bucket.num' = '1'
+);
+```
+
+## Step 3: Create the Aggregated Profile Table
+
+Create the `user_profiles` table using the **Aggregation Merge Engine**. This 
pushes the aggregation logic (summing clicks, unioning bitmaps) to the Fluss 
TabletServers.
+
+:::tip
+We use `rbm64` ([RoaringBitmap](https://roaringbitmap.org/)) for efficient 
unique visitor counting. This allows you to store millions of unique IDs in a 
highly compressed format.
+:::
+
+```sql
+CREATE TABLE user_profiles (
+    profile_id INT,
+    unique_visitors BYTES,
+    total_clicks BIGINT,
+    PRIMARY KEY (profile_id) NOT ENFORCED
+) WITH (
+    'connector' = 'fluss',
+    'table.merge-engine' = 'aggregation',
+    'fields.unique_visitors.agg' = 'rbm64',
+    'fields.total_clicks.agg' = 'sum',
+    'bucket.num' = '1'
+);
+```
+
+## Step 4: Ingest and Process Data
+
+Create a temporary source table to simulate raw user events using the Faker 
connector.
+
+```sql
+CREATE TEMPORARY TABLE raw_events (
+    email STRING,
+    click_count INT,
+    profile_group_id INT,
+    proctime AS PROCTIME()
+) WITH (
+    'connector' = 'faker',
+    'rows-per-second' = '1',
+    'fields.email.expression' = '#{internet.emailAddress}',
+    'fields.click_count.expression' = '#{number.numberBetween ''1'',''10''}',
+    'fields.profile_group_id.expression' = '#{number.numberBetween 
''1'',''5''}'
+);
+```
+
+Now, run the pipeline. The `lookup.insert-if-not-exists` hint ensures that if 
an email is not found in `user_dict`, Fluss generates a new `uid` on the fly.
+
+```sql
+INSERT INTO user_profiles
+SELECT
+    d.uid,
+    -- Convert INT to BYTES for rbm64. 
+    -- Note: In a real production job, you might use a UDF to ensure correct 
bitmap initialization.
+    CAST(CAST(d.uid AS STRING) AS BYTES),

Review Comment:
   I believe we need the `to_rbm` and `from_rbm` Flink UDFs to process the data 
correctly. Without these functions, the results would be meaningless and users 
would not understand the purpose of this feature.
   
   However, shipping Flink UDFs falls outside the scope of the Fluss project. I 
will coordinate with members of the Flink community to contribute these UDFs 
and identify an appropriate location to open source and publish the UDF JARs. 
Once available, we can reference these functions in our documentation and 
examples.
   
   That said, the Lunar New Year holiday is approaching in China, so we likely 
will not be able to start this work until March. Until then, we may need to 
suspend this PR. Thank you for your quick updating. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to