This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch release-0.9 in repository https://gitbox.apache.org/repos/asf/fluss.git
commit 7a11e092c874f0d36bc4488c5ee82fc565c39c64 Author: Yann Byron <[email protected]> AuthorDate: Thu Feb 12 19:51:41 2026 +0800 [spark] Introduce Spark connector documentation (#2668) --- website/docs/engine-flink/datastream.mdx | 2 +- website/docs/engine-flink/delta-joins.md | 2 +- website/docs/engine-flink/getting-started.md | 2 +- website/docs/engine-flink/options.md | 2 +- website/docs/engine-flink/procedures.md | 2 +- website/docs/engine-spark/ddl.md | 253 +++++++++++++++++++++ website/docs/engine-spark/getting-started.md | 207 +++++++++++++++++ website/docs/engine-spark/options.md | 19 ++ website/docs/engine-spark/procedures.md | 2 +- website/docs/engine-spark/reads.md | 259 ++++++++++++++++++++++ website/docs/engine-spark/structured-streaming.md | 144 ++++++++++++ website/docs/engine-spark/writes.md | 115 ++++++++++ website/docusaurus.config.ts | 2 +- 13 files changed, 1004 insertions(+), 7 deletions(-) diff --git a/website/docs/engine-flink/datastream.mdx b/website/docs/engine-flink/datastream.mdx index a578cdaac..3d2910a31 100644 --- a/website/docs/engine-flink/datastream.mdx +++ b/website/docs/engine-flink/datastream.mdx @@ -3,7 +3,7 @@ title: "DataStream API" sidebar_position: 8 --- -# DataStream API +# Flink DataStream API ## Overview The Fluss DataStream Connector for Apache Flink provides a Flink DataStream source implementation for reading data from Fluss tables and a Flink DataStream sink implementation for writing data to Fluss tables. It allows you to seamlessly integrate Fluss tables with Flink's DataStream API, enabling you to process data from Fluss in your Flink applications. diff --git a/website/docs/engine-flink/delta-joins.md b/website/docs/engine-flink/delta-joins.md index 8388b71c8..44f6f3746 100644 --- a/website/docs/engine-flink/delta-joins.md +++ b/website/docs/engine-flink/delta-joins.md @@ -4,7 +4,7 @@ title: Flink Delta Joins sidebar_position: 7 --- -# The Delta Join +# Flink Delta Join Beginning with **Apache Flink 2.1**, a new operator called [Delta Join](https://cwiki.apache.org/confluence/display/FLINK/FLIP-486%3A+Introduce+A+New+DeltaJoin) was introduced. Compared to traditional streaming joins, the delta join operator significantly reduces the amount of state that needs to be maintained during execution. This improvement helps mitigate several common issues associated with large state sizes, including: diff --git a/website/docs/engine-flink/getting-started.md b/website/docs/engine-flink/getting-started.md index 8c4e926f7..a63b12f77 100644 --- a/website/docs/engine-flink/getting-started.md +++ b/website/docs/engine-flink/getting-started.md @@ -58,7 +58,7 @@ tar -xzf flink-1.20.3-bin-scala_2.12.tgz Download [Fluss Flink Bundled jar](/downloads) and copy to the `lib` directory of your Flink home. ```shell -cp fluss-flink-$FLUSS_VERSION$.jar <FLINK_HOME>/lib/ +cp fluss-flink-1.20-$FLUSS_VERSION$.jar <FLINK_HOME>/lib/ ``` :::note If you use [Amazon S3](http://aws.amazon.com/s3/), [Aliyun OSS](https://www.aliyun.com/product/oss) or [HDFS(Hadoop Distributed File System)](https://hadoop.apache.org/docs/stable/) as Fluss's [remote storage](maintenance/tiered-storage/remote-storage.md), diff --git a/website/docs/engine-flink/options.md b/website/docs/engine-flink/options.md index 2ed7f5a44..200c8bb5c 100644 --- a/website/docs/engine-flink/options.md +++ b/website/docs/engine-flink/options.md @@ -3,7 +3,7 @@ title: Connector Options sidebar_position: 9 --- -# Connector Options +# Flink Connector Options The following table lists the options that can be used to configure the Fluss table in Flink, including the storage format, the read/write behaviors. diff --git a/website/docs/engine-flink/procedures.md b/website/docs/engine-flink/procedures.md index 9cd9cb1d6..b3035e836 100644 --- a/website/docs/engine-flink/procedures.md +++ b/website/docs/engine-flink/procedures.md @@ -4,7 +4,7 @@ title: Procedures sidebar_position: 3 --- -# Procedures +# Flink Procedures Fluss provides stored procedures to perform administrative and management operations through Flink SQL. All procedures are located in the `sys` namespace and can be invoked using the `CALL` statement. diff --git a/website/docs/engine-spark/ddl.md b/website/docs/engine-spark/ddl.md new file mode 100644 index 000000000..0eee2d3ed --- /dev/null +++ b/website/docs/engine-spark/ddl.md @@ -0,0 +1,253 @@ +--- +sidebar_label: DDL +title: Spark DDL +sidebar_position: 2 +--- + +# Spark DDL + +## Create Catalog + +Fluss supports creating and managing tables through the Fluss Catalog in Spark. The catalog is configured via Spark's catalog plugin mechanism. + +```properties title="spark-defaults.conf" +spark.sql.catalog.fluss_catalog=org.apache.fluss.spark.SparkCatalog +spark.sql.catalog.fluss_catalog.bootstrap.servers=fluss-server-1:9123 +``` + +```sql title="Spark SQL" +USE fluss_catalog; +``` + +The following properties can be set if using the Fluss catalog: + +| Option | Required | Default | Description | +|---------------------|----------|---------|--------------------------------------------------------------------------------| +| bootstrap.servers | required | (none) | Comma separated list of Fluss servers. | + +The following statements assume that the current catalog has been switched to the Fluss catalog using the `USE fluss_catalog` statement. + +## Create Database + +By default, FlussCatalog will use the `fluss` database. You can use the following example to create a separate database: + +```sql title="Spark SQL" +CREATE DATABASE my_db COMMENT 'created by spark'; +``` + +```sql title="Spark SQL" +USE my_db; +``` + +## Drop Database + +To delete a database, this will drop all the tables in the database as well: + +```sql title="Spark SQL" +-- Switch to another database first +USE fluss; +``` + +```sql title="Spark SQL" +-- Drop the database +DROP DATABASE my_db; +``` + +## Create Table + +### Log Table + +The following SQL statement creates a Log Table by not specifying `primary.key` property. + +```sql title="Spark SQL" +CREATE TABLE my_log_table ( + order_id BIGINT, + item_id BIGINT, + amount INT, + address STRING +); +``` + +### Primary Key Table + +The following SQL statement creates a Primary Key Table by specifying `primary.key` in `TBLPROPERTIES`. + +```sql title="Spark SQL" +CREATE TABLE my_pk_table ( + shop_id BIGINT, + user_id BIGINT, + num_orders INT, + total_amount INT +) TBLPROPERTIES ( + 'primary.key' = 'shop_id,user_id', + 'bucket.num' = '4' +); +``` + +### Partitioned Table + +The following SQL statement creates a Partitioned Log Table in Fluss. + +```sql title="Spark SQL" +CREATE TABLE my_part_log_table ( + order_id BIGINT, + item_id BIGINT, + amount INT, + address STRING, + dt STRING +) PARTITIONED BY (dt); +``` + +The following SQL statement creates a Partitioned Primary Key Table in Fluss. + +```sql title="Spark SQL" +CREATE TABLE my_part_pk_table ( + id INT, + name STRING, + pt STRING +) PARTITIONED BY (pt) TBLPROPERTIES ( + 'primary.key' = 'id,pt' +); +``` + +:::note +1. Currently, Fluss only supports partitioned field with `STRING` type. +2. For the Partitioned Primary Key Table, the partitioned field must be a subset of the primary key. +::: + +#### Multi-Fields Partitioned Table + +Fluss also supports multi-field partitioning. The following SQL statement creates a Multi-Fields Partitioned Table: + +```sql title="Spark SQL" +CREATE TABLE my_multi_part_table ( + id INT, + name STRING, + pt1 STRING, + pt2 INT +) PARTITIONED BY (pt1, pt2); +``` + +### Table Properties + +The following table properties can be specified when creating a table: + +| Property | Required | Description | +|--------------|----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| primary.key | optional | The primary keys of the Fluss table. Multiple columns are separated by commas (e.g., `'col1,col2'`). | +| bucket.key | optional | The distribution key of the Fluss table. Data will be distributed to each bucket according to the hash value of the bucket key. Must be a subset of the primary keys (excluding partition keys). If not specified, defaults to the primary key (excluding partition keys) for PK tables, or random distribution for Log tables. | +| bucket.num | optional | The number of buckets of the Fluss table. | + +You can also pass additional custom properties and Fluss storage options through `TBLPROPERTIES`: + +```sql title="Spark SQL" +CREATE TABLE my_table ( + id INT, + name STRING +) TBLPROPERTIES ( + 'primary.key' = 'id', + 'bucket.num' = '4', + 'key1' = 'value1' +); +``` + +## Describe Table + +To describe the schema of a table: + +```sql title="Spark SQL" +DESC my_table; +``` + +## Show Tables + +To list all tables in the current database: + +```sql title="Spark SQL" +SHOW TABLES; +``` + +You can also specify a database and use patterns: + +```sql title="Spark SQL" +-- Show tables in a specific database +SHOW TABLES IN fluss; + +-- Show tables matching a pattern +SHOW TABLES FROM fluss LIKE 'test_*'; +``` + +## Drop Table + +To delete a table, run: + +```sql title="Spark SQL" +DROP TABLE my_table; +``` + +This will entirely remove all the data of the table in the Fluss cluster. + +## Alter Table + +### SET Properties + +The `SET TBLPROPERTIES` statement allows users to add or modify table properties: + +```sql title="Spark SQL" +ALTER TABLE my_table SET TBLPROPERTIES ('key1' = 'value1', 'key2' = 'value2'); +``` + +### UNSET Properties + +The `UNSET TBLPROPERTIES` statement allows users to remove table properties: + +```sql title="Spark SQL" +ALTER TABLE my_table UNSET TBLPROPERTIES ('key1', 'key2'); +``` + +:::note +Most table properties with prefix of `table.` are not allowed to be modified. +::: + +## Show Partitions + +To show all the partitions of a partitioned table, run: + +```sql title="Spark SQL" +SHOW PARTITIONS my_part_table; +``` + +For multi-field partitioned tables, you can use the `SHOW PARTITIONS` command with a partial partition filter to list matching partitions: + +```sql title="Spark SQL" +-- Show partitions using a partial partition filter +SHOW PARTITIONS my_multi_part_table PARTITION (pt1 = 'a'); +``` + +## Add Partition + +Fluss supports manually adding partitions to an existing partitioned table: + +```sql title="Spark SQL" +-- Add a partition +ALTER TABLE my_multi_part_table ADD PARTITION (pt1 = 'b', pt2 = 1); + +-- Add a partition if not exists +ALTER TABLE my_multi_part_table ADD IF NOT EXISTS PARTITION (pt1 = 'b', pt2 = 1); +``` + +## Drop Partition + +To drop a partition from a partitioned table: + +```sql title="Spark SQL" +-- Drop a partition +ALTER TABLE my_multi_part_table DROP PARTITION (pt1 = 'a', pt2 = 2); + +-- Drop a partition if exists +ALTER TABLE my_multi_part_table DROP IF EXISTS PARTITION (pt1 = 'a', pt2 = 2); +``` + +:::note +Spark does not support dropping partial partitions. You must specify all partition fields when dropping a partition. +::: diff --git a/website/docs/engine-spark/getting-started.md b/website/docs/engine-spark/getting-started.md new file mode 100644 index 000000000..e93fe0c86 --- /dev/null +++ b/website/docs/engine-spark/getting-started.md @@ -0,0 +1,207 @@ +--- +sidebar_label: "Getting Started" +title: "Getting Started with Spark" +sidebar_position: 1 +--- + +# Getting Started with Spark Engine + +## Supported Spark Versions +| Fluss Connector Versions | Supported Spark Versions | +|--------------------------|--------------------------| +| $FLUSS_VERSION_SHORT$ | 3.4, 3.5 | + + +## Feature Support +Fluss supports Apache Spark's SQL API and Spark Structured Streaming. + +| Feature Support | Spark | Notes | +|------------------------------------------------------|-------|---------------------------------------------| +| [SQL Create Catalog](ddl.md#create-catalog) | ✔️ | | +| [SQL Create Database](ddl.md#create-database) | ✔️ | | +| [SQL Drop Database](ddl.md#drop-database) | ✔️ | | +| [SQL Create Table](ddl.md#create-table) | ✔️ | | +| [SQL Drop Table](ddl.md#drop-table) | ✔️ | | +| [SQL Describe Table](ddl.md#describe-table) | ✔️ | | +| [SQL Show Tables](ddl.md#show-tables) | ✔️ | | +| [SQL Alter Table](ddl.md#alter-table) | ✔️ | SET/UNSET TBLPROPERTIES | +| [SQL Show Partitions](ddl.md#show-partitions) | ✔️ | | +| [SQL Add Partition](ddl.md#add-partition) | ✔️ | | +| [SQL Drop Partition](ddl.md#drop-partition) | ✔️ | | +| [SQL Select (Batch)](reads.md) | ✔️ | Log table and primary-key table | +| [SQL Insert Into](writes.md) | ✔️ | Log table and primary-key table | +| [Structured Streaming Read](structured-streaming.md#streaming-read) | ✔️ | Log table and primary-key table | +| [Structured Streaming Write](structured-streaming.md#streaming-write) | ✔️ | Log table and primary-key table | + + +## Preparation when using Spark SQL + +- **Download Spark** + +Spark runs on all UNIX-like environments, i.e., Linux, Mac OS X. You can download the binary release of Spark from the [Apache Spark Downloads](https://spark.apache.org/downloads.html) page, then extract the archive: + +```shell +tar -xzf spark-3.5.7-bin-hadoop3.tgz +``` + +- **Copy Fluss Spark Bundled Jar** + +Download [Fluss Spark Bundled jar](/downloads) and copy to the `jars` directory of your Spark home. + +```shell +cp fluss-spark-3.5_2.12-$FLUSS_VERSION$.jar <SPARK_HOME>/jars/ +``` + +- **Start Spark SQL** + +To quickly start the Spark SQL CLI, you can use the provided script: + +```shell +<SPARK_HOME>/bin/spark-sql \ + --conf spark.sql.catalog.fluss_catalog=org.apache.fluss.spark.SparkCatalog \ + --conf spark.sql.catalog.fluss_catalog.bootstrap.servers=localhost:9123 \ + --conf spark.sql.extensions=org.apache.fluss.spark.FlussSparkSessionExtensions +``` + +Or start Spark Shell: + +```shell +<SPARK_HOME>/bin/spark-shell \ + --conf spark.sql.catalog.fluss_catalog=org.apache.fluss.spark.SparkCatalog \ + --conf spark.sql.catalog.fluss_catalog.bootstrap.servers=localhost:9123 \ + --conf spark.sql.extensions=org.apache.fluss.spark.FlussSparkSessionExtensions +``` + +## Creating a Catalog + +The Fluss catalog can be configured in `spark-defaults.conf` or passed as command-line arguments. + +Using `spark-defaults.conf`: + +```properties +spark.sql.catalog.fluss_catalog=org.apache.fluss.spark.SparkCatalog +spark.sql.catalog.fluss_catalog.bootstrap.servers=localhost:9123 +spark.sql.extensions=org.apache.fluss.spark.FlussSparkSessionExtensions +``` + +Or configure programmatically in Scala/Python: + +```scala +val spark = SparkSession.builder() + .config("spark.sql.catalog.fluss_catalog", "org.apache.fluss.spark.SparkCatalog") + .config("spark.sql.catalog.fluss_catalog.bootstrap.servers", "localhost:9123") + .config("spark.sql.extensions", "org.apache.fluss.spark.FlussSparkSessionExtensions") + .getOrCreate() +``` + +:::note +1. The `spark.sql.catalog.fluss_catalog.bootstrap.servers` means the Fluss server address. Before you config the `bootstrap.servers`, + you should start the Fluss server first. See [Deploying Fluss](install-deploy/overview.md#how-to-deploy-fluss) + for how to build a Fluss cluster. + Here, it is assumed that there is a Fluss cluster running on your local machine and the CoordinatorServer port is 9123. +2. The `spark.sql.catalog.fluss_catalog.bootstrap.servers` configuration is used to discover all nodes within the Fluss cluster. It can be set with one or more (up to three) Fluss server addresses (either CoordinatorServer or TabletServer) separated by commas. +::: + +## Creating a Database + +```sql title="Spark SQL" +USE fluss_catalog; +``` + +```sql title="Spark SQL" +CREATE DATABASE fluss_db; +USE fluss_db; +``` + +## Creating a Table + +```sql title="Spark SQL" +CREATE TABLE pk_table ( + shop_id BIGINT, + user_id BIGINT, + num_orders INT, + total_amount INT +) TBLPROPERTIES ( + 'primary.key' = 'shop_id,user_id', + 'bucket.num' = '4' +); +``` + +## Data Writing + +To append new data to a table, you can use `INSERT INTO`: + +```sql title="Spark SQL" +INSERT INTO pk_table VALUES + (1234, 1234, 1, 1), + (12345, 12345, 2, 2), + (123456, 123456, 3, 3); +``` + +## Data Reading + +To retrieve data, you can use a `SELECT` statement: + +```sql title="Spark SQL" +SELECT * FROM pk_table ORDER BY shop_id; +``` + +To preview a subset of data from a log table with projection and filter: + +```sql title="Spark SQL" +SELECT shop_id, total_amount FROM pk_table WHERE num_orders > 1; +``` + + +## Type Conversion + +Fluss's integration for Spark automatically converts between Spark and Fluss types. + +### Fluss -> Apache Spark + +| Fluss | Spark | +|---------------|------------------| +| BOOLEAN | BooleanType | +| TINYINT | ByteType | +| SMALLINT | ShortType | +| INT | IntegerType | +| BIGINT | LongType | +| FLOAT | FloatType | +| DOUBLE | DoubleType | +| CHAR | CharType | +| STRING | StringType | +| DECIMAL | DecimalType | +| DATE | DateType | +| TIMESTAMP | TimestampNTZType | +| TIMESTAMP_LTZ | TimestampType | +| BYTES | BinaryType | +| ARRAY | ArrayType | +| MAP | MapType | +| ROW | StructType | + +:::note +The `MAP` type is currently supported for table creation and schema mapping, but **read and write operations on MAP columns are not yet supported**. Full MAP type read/write support will be available soon. +::: + +### Apache Spark -> Fluss + +| Spark | Fluss | +|------------------|-----------------------------------------------| +| BooleanType | BOOLEAN | +| ByteType | TINYINT | +| ShortType | SMALLINT | +| IntegerType | INT | +| LongType | BIGINT | +| FloatType | FLOAT | +| DoubleType | DOUBLE | +| CharType | CHAR | +| StringType | STRING | +| VarcharType | STRING | +| DecimalType | DECIMAL | +| DateType | DATE | +| TimestampType | TIMESTAMP_LTZ | +| TimestampNTZType | TIMESTAMP | +| BinaryType | BYTES | +| ArrayType | ARRAY | +| MapType | MAP (read/write not yet supported) | +| StructType | ROW | diff --git a/website/docs/engine-spark/options.md b/website/docs/engine-spark/options.md new file mode 100644 index 000000000..b1f74cbc5 --- /dev/null +++ b/website/docs/engine-spark/options.md @@ -0,0 +1,19 @@ +--- +sidebar_label: Connector Options +title: Spark Connector Options +sidebar_position: 7 +--- + +# Spark Connector Options + +This page lists all the available options for the Fluss Spark connector. + +## Read Options + +The following Spark configurations can be used to control read behavior for both batch and streaming reads. These options are set using `SET` in Spark SQL or via `spark.conf.set()` in Spark applications. All options are prefixed with `spark.sql.fluss.`. + +| Option | Default | Description | +|--------|---------|-------------| +| `spark.sql.fluss.scan.startup.mode` | `full` | The startup mode when reading a Fluss table. Supported values: <ul><li>`full` (default): For primary key tables, reads the full snapshot and merges with log changes. For log tables, reads from the earliest offset.</li><li>`earliest`: Reads from the earliest log/changelog offset.</li><li>`latest`: Reads from the latest log/changelog offset.</li><li>`timestamp`: Reads from a specified timestamp (requires `scan.startup.timestamp`).</li></ul>* [...] +| `spark.sql.fluss.read.optimized` | `false` | If `true`, Spark will only read data from the data lake snapshot or KV snapshot, without merging log changes. This can improve read performance but may return stale data for primary key tables. | +| `spark.sql.fluss.scan.poll.timeout` | `10000ms` | The timeout for the log scanner to poll records. | diff --git a/website/docs/engine-spark/procedures.md b/website/docs/engine-spark/procedures.md index 0d8b01047..ace2d3a6d 100644 --- a/website/docs/engine-spark/procedures.md +++ b/website/docs/engine-spark/procedures.md @@ -4,7 +4,7 @@ title: Procedures sidebar_position: 3 --- -# Procedures +# Spark Procedures Fluss provides stored procedures to perform administrative and management operations through Spark SQL. All procedures are located in the `sys` namespace and can be invoked using the `CALL` statement. diff --git a/website/docs/engine-spark/reads.md b/website/docs/engine-spark/reads.md new file mode 100644 index 000000000..26cc1c5fd --- /dev/null +++ b/website/docs/engine-spark/reads.md @@ -0,0 +1,259 @@ +--- +sidebar_label: Reads +title: Spark Reads +sidebar_position: 5 +--- + +# Spark Reads + +Fluss supports batch read with [Apache Spark](https://spark.apache.org/)'s SQL API for both Log Tables and Primary Key Tables. + +:::tip +For streaming read, see the [Structured Streaming Read](structured-streaming.md#streaming-read) section. +::: + +## Limitations + +:::caution +- For tables with datalake enabled (`table.datalake.enabled = true`), batch read can only read data stored in the Fluss cluster and cannot read data that has been tiered to the underlying lake storage (e.g., Paimon, Iceberg). Reading the full dataset including lake data will be supported in a future release. +::: + +## Batch Read + +### Log Table + +You can read data from a log table using the `SELECT` statement. + +#### Example + +1. Create a table and prepare data: + +```sql title="Spark SQL" +CREATE TABLE log_table ( + order_id BIGINT, + item_id BIGINT, + amount INT, + address STRING +); +``` + +```sql title="Spark SQL" +INSERT INTO log_table VALUES + (600, 21, 601, 'addr1'), + (700, 22, 602, 'addr2'), + (800, 23, 603, 'addr3'), + (900, 24, 604, 'addr4'), + (1000, 25, 605, 'addr5'); +``` + +2. Query data: + +```sql title="Spark SQL" +SELECT * FROM log_table ORDER BY order_id; +``` + +#### Projection + +Column projection minimizes I/O by reading only the columns used in a query: + +```sql title="Spark SQL" +SELECT address, item_id FROM log_table ORDER BY order_id; +``` + +#### Filter + +Filters are applied to reduce the amount of data read: + +```sql title="Spark SQL" +SELECT * FROM log_table WHERE amount % 2 = 0 ORDER BY order_id; +``` + +#### Projection + Filter + +Projection and filter can be combined for efficient queries: + +```sql title="Spark SQL" +SELECT order_id, item_id FROM log_table +WHERE order_id >= 900 ORDER BY order_id; +``` + +### Partitioned Log Table + +Reading from partitioned log tables supports partition filtering: + +```sql title="Spark SQL" +CREATE TABLE part_log_table ( + order_id BIGINT, + item_id BIGINT, + amount INT, + address STRING, + dt STRING +) PARTITIONED BY (dt); +``` + +```sql title="Spark SQL" +INSERT INTO part_log_table VALUES + (600, 21, 601, 'addr1', '2026-01-01'), + (700, 22, 602, 'addr2', '2026-01-01'), + (800, 23, 603, 'addr3', '2026-01-02'), + (900, 24, 604, 'addr4', '2026-01-02'), + (1000, 25, 605, 'addr5', '2026-01-03'); +``` + +```sql title="Spark SQL" +-- Read with partition filter +SELECT * FROM part_log_table WHERE dt = '2026-01-01' ORDER BY order_id; +``` + +```sql title="Spark SQL" +-- Read with multiple partitions filter +SELECT order_id, address, dt FROM part_log_table +WHERE dt IN ('2026-01-01', '2026-01-02') +ORDER BY order_id; +``` + +### Primary Key Table + +The Fluss source supports batch read for primary-key tables. It reads data from the latest snapshot and merges it with log changes to provide the most up-to-date view. + +#### Example + +1. Create a table and prepare data: + +```sql title="Spark SQL" +CREATE TABLE pk_table ( + order_id BIGINT, + item_id BIGINT, + amount INT, + address STRING +) TBLPROPERTIES ( + 'primary.key' = 'order_id', + 'bucket.num' = '1' +); +``` + +```sql title="Spark SQL" +INSERT INTO pk_table VALUES + (600, 21, 601, 'addr1'), + (700, 22, 602, 'addr2'), + (800, 23, 603, 'addr3'), + (900, 24, 604, 'addr4'), + (1000, 25, 605, 'addr5'); +``` + +2. Query data: + +```sql title="Spark SQL" +SELECT * FROM pk_table ORDER BY order_id; +``` + +3. After upsert, the query reflects the latest values: + +```sql title="Spark SQL" +-- Upsert data +INSERT INTO pk_table VALUES + (700, 220, 602, 'addr2'), + (900, 240, 604, 'addr4'), + (1100, 260, 606, 'addr6'); +``` + +```sql title="Spark SQL" +-- Query reflects the latest data +SELECT order_id, item_id, address FROM pk_table +WHERE amount <= 603 ORDER BY order_id; +``` + +### Partitioned Primary Key Table + +Reading from partitioned primary key tables also supports partition filtering: + +```sql title="Spark SQL" +CREATE TABLE part_pk_table ( + order_id BIGINT, + item_id BIGINT, + amount INT, + address STRING, + dt STRING +) PARTITIONED BY (dt) TBLPROPERTIES ( + 'primary.key' = 'order_id,dt', + 'bucket.num' = '1' +); +``` + +```sql title="Spark SQL" +INSERT INTO part_pk_table VALUES + (600, 21, 601, 'addr1', '2026-01-01'), + (700, 22, 602, 'addr2', '2026-01-01'), + (800, 23, 603, 'addr3', '2026-01-02'), + (900, 24, 604, 'addr4', '2026-01-02'), + (1000, 25, 605, 'addr5', '2026-01-03'); +``` + +```sql title="Spark SQL" +-- Read with partition filter +SELECT * FROM part_pk_table +WHERE dt = '2026-01-01' +ORDER BY order_id; +``` + +```sql title="Spark SQL" +-- Read with multiple partition filters +SELECT * FROM part_pk_table +WHERE dt IN ('2026-01-01', '2026-01-02') +ORDER BY order_id; +``` + +## All Data Types + +Fluss Spark connector supports reading all Fluss data types including nested types: + +:::note +The `MAP` type is currently **not supported** for read operations. Full MAP type read support will be available soon. +::: + +```sql title="Spark SQL" +CREATE TABLE all_types_table ( + id INT, + flag BOOLEAN, + small SHORT, + value INT, + big BIGINT, + real FLOAT, + amount DOUBLE, + name STRING, + decimal_val DECIMAL(10, 2), + date_val DATE, + timestamp_ntz_val TIMESTAMP, + timestamp_ltz_val TIMESTAMP_LTZ, + arr ARRAY<INT>, + struct_col STRUCT<col1: INT, col2: STRING> +); +``` + +```sql title="Spark SQL" +INSERT INTO all_types_table VALUES + (1, true, 100, 1000, 10000, 12.34, 56.78, 'string_val', + 123.45, DATE '2026-01-01', TIMESTAMP '2026-01-01 12:00:00', TIMESTAMP '2026-01-01 12:00:00', + ARRAY(1, 2, 3), STRUCT(100, 'nested_value')), + (2, false, 200, 2000, 20000, 23.45, 67.89, 'another_str', + 223.45, DATE '2026-01-02', TIMESTAMP '2026-01-02 12:00:00', TIMESTAMP '2026-01-02 12:00:00', + ARRAY(4, 5, 6), STRUCT(200, 'nested_value2')); +``` + +```sql title="Spark SQL" +SELECT * FROM all_types_table; +``` + +## Read Optimized Mode + +For primary key tables, Fluss by default reads the latest snapshot and merges it with log changes to return the most up-to-date data. You can enable **read-optimized mode** to skip the merge step and read only snapshot data, which improves query performance at the cost of data freshness. + +```sql title="Spark SQL" +-- Enable read-optimized mode for primary key tables +SET spark.sql.fluss.read.optimized=true; + +-- Query returns only snapshot data (may be stale) +SELECT * FROM pk_table; +``` + +For more details on all available read options, see the [Connector Options](options.md#read-options) page. diff --git a/website/docs/engine-spark/structured-streaming.md b/website/docs/engine-spark/structured-streaming.md new file mode 100644 index 000000000..4fcf2fafd --- /dev/null +++ b/website/docs/engine-spark/structured-streaming.md @@ -0,0 +1,144 @@ +--- +sidebar_label: Structured Streaming +title: Spark Structured Streaming +sidebar_position: 6 +--- + +# Spark Structured Streaming + +Fluss supports [Spark Structured Streaming](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html) for both reading and writing data in a streaming fashion. This enables building real-time data pipelines with Apache Spark and Fluss. + +## Streaming Write + +Fluss supports writing data to Fluss tables using Spark Structured Streaming. This is useful for continuously ingesting data into Fluss tables from streaming sources. + +### Write to Log Table + +```scala title="Spark Application" +// Here we use MemoryStream to fake a streaming source. +val inputData = MemoryStream[(Int, String)] +val inputDF = inputData.toDS().toDF("k", "v") + +val query = inputDF.writeStream + .option("checkpointLocation", "/path/to/checkpoint") + .toTable("fluss_catalog.fluss.log_table") + +query.awaitTermination() +``` + +### Write to Primary Key Table + +When writing to a primary key table, if a record with the same primary key already exists, it will be updated (upsert semantics): + +```scala title="Spark Application" +// Here we use MemoryStream to fake a streaming source. +val inputData = MemoryStream[(Int, String)] +val inputDF = inputData.toDS().toDF("k", "v") + +val query = inputDF.writeStream + .option("checkpointLocation", "/path/to/checkpoint") + .toTable("fluss_catalog.fluss.pk_table") + +query.awaitTermination() +``` + +:::note +Fluss supports exactly-once semantics for streaming writes through Spark's checkpoint mechanism. Make sure to specify a `checkpointLocation` for fault tolerance. +::: + +## Streaming Read + +Fluss supports reading data from Fluss tables using Spark Structured Streaming. The streaming source continuously reads new data as it arrives. + +:::caution Limitations +- Streaming read currently only supports the `latest` startup mode. Other modes (`full`, `earliest`, `timestamp`) are not yet supported and will be available in a future release. +::: + +### Read from Log Table + +```scala title="Spark Application" +val df = spark.readStream + .option("scan.startup.mode", "latest") + .table("fluss_catalog.fluss.log_table") + +val query = df.writeStream + .format("console") + .start() + +query.awaitTermination() +``` + +### Read from Primary Key Table + +```scala title="Spark Application" +val df = spark.readStream + .option("scan.startup.mode", "latest") + .table("fluss_catalog.fluss.pk_table") + +val query = df.writeStream + .format("console") + .start() + +query.awaitTermination() +``` + +## Trigger Modes + +Fluss Spark streaming source supports the following Spark trigger modes: + +| Trigger Mode | Description | +|-----------------------------------|-----------------------------------------------------------------------------| +| Default (micro-batch) | Processes data as soon as new data is available. | +| `Trigger.ProcessingTime(interval)` | Processes data at fixed time intervals. | + +**Example:** + +```scala title="Spark Application" +import org.apache.spark.sql.streaming.Trigger + +val df = spark.readStream + .option("scan.startup.mode", "latest") + .table("fluss_catalog.fluss.my_table") + +// Processing time trigger (every 5 seconds) +val query = df.writeStream + .format("console") + .trigger(Trigger.ProcessingTime("5 seconds")) + .start() + +query.awaitTermination() +``` + +## End-to-End Example + +Here is a complete example that reads data from one Fluss table and writes to another: + +```scala title="Spark Application" +import org.apache.spark.sql.SparkSession + +val spark = SparkSession.builder() + .config("spark.sql.catalog.fluss_catalog", "org.apache.fluss.spark.SparkCatalog") + .config("spark.sql.catalog.fluss_catalog.bootstrap.servers", "localhost:9123") + .config("spark.sql.defaultCatalog", "fluss_catalog") + .config("spark.sql.extensions", "org.apache.fluss.spark.FlussSparkSessionExtensions") + .getOrCreate() + +// Create source and sink tables +spark.sql("CREATE TABLE IF NOT EXISTS source_table (id INT, data STRING)") +spark.sql(""" + CREATE TABLE IF NOT EXISTS sink_table (id INT, data STRING) + TBLPROPERTIES ('primary.key' = 'id') +""") + +// Read from source table +val sourceDF = spark.readStream + .option("scan.startup.mode", "latest") + .table("fluss_catalog.fluss.source_table") + +// Write to sink table +val query = sourceDF.writeStream + .option("checkpointLocation", "/tmp/fluss-streaming-checkpoint") + .toTable("fluss_catalog.fluss.sink_table") + +query.awaitTermination() +``` diff --git a/website/docs/engine-spark/writes.md b/website/docs/engine-spark/writes.md new file mode 100644 index 000000000..3036855f9 --- /dev/null +++ b/website/docs/engine-spark/writes.md @@ -0,0 +1,115 @@ +--- +sidebar_label: Writes +title: Spark Writes +sidebar_position: 4 +--- + +# Spark Writes + +You can directly insert data into a Fluss table using the `INSERT INTO` statement. +Fluss primary key tables support upsert semantics (inserting a row with an existing primary key will update the existing record), while Fluss log tables only accept append-only writes. + +## INSERT INTO + +`INSERT INTO` statements are used to write data to Fluss tables in batch mode. +They are compatible with primary-key tables (for upserting data) as well as log tables (for appending data). + +### Appending Data to the Log Table + +#### Create a Log Table + +```sql title="Spark SQL" +CREATE TABLE log_table ( + order_id BIGINT, + item_id BIGINT, + amount INT, + address STRING +); +``` + +#### Insert Data into the Log Table + +```sql title="Spark SQL" +INSERT INTO log_table VALUES + (600, 21, 601, 'addr1'), + (700, 22, 602, 'addr2'), + (800, 23, 603, 'addr3'), + (900, 24, 604, 'addr4'), + (1000, 25, 605, 'addr5'); +``` + +### Perform Data Upserts to the PrimaryKey Table + +#### Create a primary key table + +```sql title="Spark SQL" +CREATE TABLE pk_table ( + order_id BIGINT, + item_id BIGINT, + amount INT, + address STRING +) TBLPROPERTIES ( + 'primary.key' = 'order_id', + 'bucket.num' = '1' +); +``` + +#### Insert Data + +```sql title="Spark SQL" +INSERT INTO pk_table VALUES + (600, 21, 601, 'addr1'), + (700, 22, 602, 'addr2'), + (800, 23, 603, 'addr3'); +``` + +#### Upsert Data + +When inserting data with the same primary key, the existing record will be updated: + +```sql title="Spark SQL" +-- This will update the records with order_id 700 and 800 +INSERT INTO pk_table VALUES + (700, 220, 602, 'addr2'), + (800, 230, 603, 'addr3'); +``` + +### All Data Types + +Fluss Spark connector supports all Fluss data types including nested types. Here is an example of writing various data types: + +:::note +The `MAP` type is currently **not supported** for write operations. Full MAP type write support will be available soon. +::: + +```sql title="Spark SQL" +CREATE TABLE all_types_table ( + bool_col BOOLEAN, + tinyint_col BYTE, + smallint_col SHORT, + int_col INT, + bigint_col BIGINT, + float_col FLOAT, + double_col DOUBLE, + decimal_col DECIMAL(10, 2), + decimal_large_col DECIMAL(38, 2), + string_col STRING, + ts_col TIMESTAMP, + array_col ARRAY<FLOAT>, + struct_col STRUCT<id: BIGINT, name: STRING> +); +``` + +```sql title="Spark SQL" +INSERT INTO all_types_table VALUES ( + true, 1, 10, 100, 1000, 12.3, 45.6, + 1234567.89, 12345678900987654321.12, + 'test', + TIMESTAMP '2025-12-31 10:00:00', + array(11.11, 22.22), struct(123, 'apache fluss') +); +``` + +## See Also + +- [Structured Streaming Write](structured-streaming.md#streaming-write) for continuous streaming writes to Fluss tables. diff --git a/website/docusaurus.config.ts b/website/docusaurus.config.ts index 959c9e5c8..44a8ed0b6 100644 --- a/website/docusaurus.config.ts +++ b/website/docusaurus.config.ts @@ -227,7 +227,7 @@ const config: Config = { prism: { theme: prismThemes.vsDark, darkTheme: prismThemes.dracula, - additionalLanguages: ['java', 'bash'] + additionalLanguages: ['java', 'bash', 'scala'] }, algolia: { appId: "X8KSGGLJW1",
