xx789633 commented on code in PR #2669: URL: https://github.com/apache/fluss/pull/2669#discussion_r2805064900
########## website/docs/quickstart/User-Profile.md: ########## @@ -0,0 +1,279 @@ +--- +title: Real-Time User Profile +sidebar_position: 4 +--- + +# Real-Time User Profile + +This tutorial demonstrates how to build a production-grade, real-time user profiling system using Apache Fluss. You will learn how to map high-cardinality string identifiers (like emails) to compact integers and aggregate user behavior directly in the storage layer with exactly-once guarantees. + + + +## How the System Works + +### Core Concepts +* **Identity Mapping**: High-cardinality strings (Emails) ➔ Compact 32-bit `INT` UIDs. +* **Offloaded Aggregation**: Computation happens in Fluss TabletServers, keeping Flink state-free. +* **Optimized Storage**: Native [RoaringBitmap](https://roaringbitmap.org/) support for sub-second unique visitor (UV) counts. + +### Data Flow +1. **Ingestion**: Raw event streams (e.g., clicks, page views) enter the system from a source like [Apache Kafka](https://kafka.apache.org/). +2. **Mapping**: The [Apache Flink](https://flink.apache.org/) job performs a temporal lookup join against the `user_dict` table. If a user is new, the `insert-if-not-exists` hint triggers Fluss to automatically generate a unique `INT` UID using its Auto-Increment feature. +3. **Merge**: The **Aggregation Merge Engine** updates clicks and bitmaps in the storage layer. +4. **Recovery**: The **Undo Recovery** mechanism ensures exactly-once accuracy during failovers. + +## Environment Setup + +### Prerequisites + +Before proceeding, ensure that [Docker](https://docs.docker.com/engine/install/) and the [Docker Compose plugin](https://docs.docker.com/compose/install/linux/) are installed. + +### Starting the Playground + +1. Create a working directory. + ```shell + mkdir fluss-user-profile + cd fluss-user-profile + ``` + +2. Set the Fluss version environment variable. + ```shell + # Set to 0.9.0 or 0.10-SNAPSHOT for latest features + export FLUSS_DOCKER_VERSION=0.10-SNAPSHOT + export FLINK_VERSION="1.20" + ``` + :::note + If you open a new terminal window, remember to re-run these export commands. + ::: + +3. Create a `lib` directory and download the required JARs. + ```shell + mkdir lib + # Download Flink Faker for data generation + curl -fL -o lib/flink-faker-0.5.3.jar https://github.com/knaufk/flink-faker/releases/download/v0.5.3/flink-faker-0.5.3.jar + # Download Fluss Connector + curl -fL -o "lib/fluss-flink-${FLINK_VERSION}-${FLUSS_DOCKER_VERSION}.jar" \ + "https://repo1.maven.org/maven2/org/apache/fluss/fluss-flink-${FLINK_VERSION}/${FLUSS_DOCKER_VERSION}/fluss-flink-${FLINK_VERSION}-${FLUSS_DOCKER_VERSION}.jar" + ``` + :::note + For now : cp ../fluss-flink/fluss-flink-1.20/target/fluss-flink-1.20-0.10-SNAPSHOT.jar lib/ + ::: + +4. Create a `docker-compose.yml` file. + ```yaml + services: + coordinator-server: + image: apache/fluss:${FLUSS_DOCKER_VERSION} + command: coordinatorServer + depends_on: + - zookeeper + environment: + - | + FLUSS_PROPERTIES= + zookeeper.address: zookeeper:2181 + bind.listeners: FLUSS://coordinator-server:9123 + remote.data.dir: /remote-data + volumes: + - fluss-remote-data:/remote-data + tablet-server: + image: apache/fluss:${FLUSS_DOCKER_VERSION} + command: tabletServer + depends_on: + - coordinator-server + environment: + - | + FLUSS_PROPERTIES= + zookeeper.address: zookeeper:2181 + bind.listeners: FLUSS://tablet-server:9123 + data.dir: /tmp/fluss/data + remote.data.dir: /remote-data + volumes: + - fluss-remote-data:/remote-data + zookeeper: + restart: always + image: zookeeper:3.9.2 + jobmanager: + image: flink:${FLINK_VERSION} + ports: + - "8081:8081" + environment: + - | + FLINK_PROPERTIES= + jobmanager.rpc.address: jobmanager + entrypoint: ["sh", "-c", "cp -v /tmp/lib/*.jar /opt/flink/lib && exec /docker-entrypoint.sh jobmanager"] + volumes: + - ./lib:/tmp/lib + - fluss-remote-data:/remote-data + taskmanager: + image: flink:${FLINK_VERSION} + depends_on: + - jobmanager + environment: + - | + FLINK_PROPERTIES= + jobmanager.rpc.address: jobmanager + taskmanager.numberOfTaskSlots: 2 + entrypoint: ["sh", "-c", "cp -v /tmp/lib/*.jar /opt/flink/lib && exec /docker-entrypoint.sh taskmanager"] + volumes: + - ./lib:/tmp/lib + - fluss-remote-data:/remote-data + sql-client: + image: flink:${FLINK_VERSION} + depends_on: + - jobmanager + environment: + - | + FLINK_PROPERTIES= + jobmanager.rpc.address: jobmanager + rest.address: jobmanager + entrypoint: ["sh", "-c", "cp -v /tmp/lib/*.jar /opt/flink/lib && exec /docker-entrypoint.sh bin/sql-client.sh"] + volumes: + - ./lib:/tmp/lib + - fluss-remote-data:/remote-data + + volumes: + fluss-remote-data: + ``` + +5. Start the environment. + ```shell + docker compose up -d + ``` + +6. Launch the Flink SQL Client. + ```shell + # Explicitly include the Fluss JAR in the classpath for the session + docker compose run sql-client bin/sql-client.sh -j /tmp/lib/fluss-flink-${FLINK_VERSION}-${FLUSS_DOCKER_VERSION}.jar + ``` + +## Step 1: Create the Fluss Catalog + +In the SQL Client, create and use the Fluss catalog. + +:::tip +Run SQL statements one by one to avoid errors. +::: + +```sql +CREATE CATALOG fluss_catalog WITH ( + 'type' = 'fluss', + 'bootstrap.servers' = 'coordinator-server:9123' +); +``` + +```sql +USE CATALOG fluss_catalog; +``` + +## Step 2: Create the User Dictionary Table + +Create the `user_dict` table to map emails to UIDs. We use `auto-increment.fields` to automatically generate unique IDs. + +```sql +CREATE TABLE user_dict ( + email STRING, + uid INT, + PRIMARY KEY (email) NOT ENFORCED +) WITH ( + 'connector' = 'fluss', + 'auto-increment.fields' = 'uid', + 'bucket.num' = '1' +); +``` + +## Step 3: Create the Aggregated Profile Table + +Create the `user_profiles` table using the **Aggregation Merge Engine**. This pushes the aggregation logic (summing clicks, unioning bitmaps) to the Fluss TabletServers. + +:::tip +We use `rbm64` ([RoaringBitmap](https://roaringbitmap.org/)) for efficient unique visitor counting. This allows you to store millions of unique IDs in a highly compressed format. +::: + +```sql +CREATE TABLE user_profiles ( + profile_id INT, + unique_visitors BYTES, + total_clicks BIGINT, + PRIMARY KEY (profile_id) NOT ENFORCED +) WITH ( + 'connector' = 'fluss', + 'table.merge-engine' = 'aggregation', + 'fields.unique_visitors.agg' = 'rbm64', + 'fields.total_clicks.agg' = 'sum', + 'bucket.num' = '1' +); +``` + +## Step 4: Ingest and Process Data + +Create a temporary source table to simulate raw user events using the Faker connector. + +```sql +CREATE TEMPORARY TABLE raw_events ( + email STRING, + click_count INT, + profile_group_id INT, + proctime AS PROCTIME() +) WITH ( + 'connector' = 'faker', + 'rows-per-second' = '1', + 'fields.email.expression' = '#{internet.emailAddress}', + 'fields.click_count.expression' = '#{number.numberBetween ''1'',''10''}', + 'fields.profile_group_id.expression' = '#{number.numberBetween ''1'',''5''}' +); +``` + +Now, run the pipeline. The `lookup.insert-if-not-exists` hint ensures that if an email is not found in `user_dict`, Fluss generates a new `uid` on the fly. + +```sql +INSERT INTO user_profiles +SELECT + d.uid, + -- Convert INT to BYTES for rbm64. + -- Note: In a real production job, you might use a UDF to ensure correct bitmap initialization. + CAST(CAST(d.uid AS STRING) AS BYTES), Review Comment: > I believe we need the `to_rbm` and `from_rbm` Flink UDFs to process the data correctly. Without these functions, the results would be meaningless and users would not understand the purpose of this feature. Exactly. we need such functions as RB_CARDINALITY and RB_OR_AGG for aggregating the result bitmap. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
