This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new e698f6ad0 [spark] Introduce Spark connector documentation (#2668)
e698f6ad0 is described below
commit e698f6ad06bfd1199e7b6981c53dec4a5df065c5
Author: Yann Byron <[email protected]>
AuthorDate: Thu Feb 12 19:51:41 2026 +0800
[spark] Introduce Spark connector documentation (#2668)
---
website/docs/engine-flink/datastream.mdx | 2 +-
website/docs/engine-flink/delta-joins.md | 2 +-
website/docs/engine-flink/getting-started.md | 2 +-
website/docs/engine-flink/options.md | 2 +-
website/docs/engine-flink/procedures.md | 2 +-
website/docs/engine-spark/ddl.md | 253 +++++++++++++++++++++
website/docs/engine-spark/getting-started.md | 207 +++++++++++++++++
website/docs/engine-spark/options.md | 19 ++
website/docs/engine-spark/procedures.md | 2 +-
website/docs/engine-spark/reads.md | 259 ++++++++++++++++++++++
website/docs/engine-spark/structured-streaming.md | 144 ++++++++++++
website/docs/engine-spark/writes.md | 115 ++++++++++
website/docusaurus.config.ts | 2 +-
13 files changed, 1004 insertions(+), 7 deletions(-)
diff --git a/website/docs/engine-flink/datastream.mdx
b/website/docs/engine-flink/datastream.mdx
index a578cdaac..3d2910a31 100644
--- a/website/docs/engine-flink/datastream.mdx
+++ b/website/docs/engine-flink/datastream.mdx
@@ -3,7 +3,7 @@ title: "DataStream API"
sidebar_position: 8
---
-# DataStream API
+# Flink DataStream API
## Overview
The Fluss DataStream Connector for Apache Flink provides a Flink DataStream
source implementation for reading data from Fluss tables and a Flink DataStream
sink implementation for writing data to Fluss tables. It allows you to
seamlessly integrate Fluss tables with Flink's DataStream API, enabling you to
process data from Fluss in your Flink applications.
diff --git a/website/docs/engine-flink/delta-joins.md
b/website/docs/engine-flink/delta-joins.md
index 8388b71c8..44f6f3746 100644
--- a/website/docs/engine-flink/delta-joins.md
+++ b/website/docs/engine-flink/delta-joins.md
@@ -4,7 +4,7 @@ title: Flink Delta Joins
sidebar_position: 7
---
-# The Delta Join
+# Flink Delta Join
Beginning with **Apache Flink 2.1**, a new operator called [Delta
Join](https://cwiki.apache.org/confluence/display/FLINK/FLIP-486%3A+Introduce+A+New+DeltaJoin)
was introduced.
Compared to traditional streaming joins, the delta join operator significantly
reduces the amount of state that needs to be maintained during execution. This
improvement helps mitigate several common issues associated with large state
sizes, including:
diff --git a/website/docs/engine-flink/getting-started.md
b/website/docs/engine-flink/getting-started.md
index 8c4e926f7..a63b12f77 100644
--- a/website/docs/engine-flink/getting-started.md
+++ b/website/docs/engine-flink/getting-started.md
@@ -58,7 +58,7 @@ tar -xzf flink-1.20.3-bin-scala_2.12.tgz
Download [Fluss Flink Bundled jar](/downloads) and copy to the `lib` directory
of your Flink home.
```shell
-cp fluss-flink-$FLUSS_VERSION$.jar <FLINK_HOME>/lib/
+cp fluss-flink-1.20-$FLUSS_VERSION$.jar <FLINK_HOME>/lib/
```
:::note
If you use [Amazon S3](http://aws.amazon.com/s3/), [Aliyun
OSS](https://www.aliyun.com/product/oss) or [HDFS(Hadoop Distributed File
System)](https://hadoop.apache.org/docs/stable/) as Fluss's [remote
storage](maintenance/tiered-storage/remote-storage.md),
diff --git a/website/docs/engine-flink/options.md
b/website/docs/engine-flink/options.md
index 2ed7f5a44..200c8bb5c 100644
--- a/website/docs/engine-flink/options.md
+++ b/website/docs/engine-flink/options.md
@@ -3,7 +3,7 @@ title: Connector Options
sidebar_position: 9
---
-# Connector Options
+# Flink Connector Options
The following table lists the options that can be used to configure the Fluss
table in Flink, including the storage format, the read/write behaviors.
diff --git a/website/docs/engine-flink/procedures.md
b/website/docs/engine-flink/procedures.md
index 9cd9cb1d6..b3035e836 100644
--- a/website/docs/engine-flink/procedures.md
+++ b/website/docs/engine-flink/procedures.md
@@ -4,7 +4,7 @@ title: Procedures
sidebar_position: 3
---
-# Procedures
+# Flink Procedures
Fluss provides stored procedures to perform administrative and management
operations through Flink SQL. All procedures are located in the `sys` namespace
and can be invoked using the `CALL` statement.
diff --git a/website/docs/engine-spark/ddl.md b/website/docs/engine-spark/ddl.md
new file mode 100644
index 000000000..0eee2d3ed
--- /dev/null
+++ b/website/docs/engine-spark/ddl.md
@@ -0,0 +1,253 @@
+---
+sidebar_label: DDL
+title: Spark DDL
+sidebar_position: 2
+---
+
+# Spark DDL
+
+## Create Catalog
+
+Fluss supports creating and managing tables through the Fluss Catalog in
Spark. The catalog is configured via Spark's catalog plugin mechanism.
+
+```properties title="spark-defaults.conf"
+spark.sql.catalog.fluss_catalog=org.apache.fluss.spark.SparkCatalog
+spark.sql.catalog.fluss_catalog.bootstrap.servers=fluss-server-1:9123
+```
+
+```sql title="Spark SQL"
+USE fluss_catalog;
+```
+
+The following properties can be set if using the Fluss catalog:
+
+| Option | Required | Default | Description
|
+|---------------------|----------|---------|--------------------------------------------------------------------------------|
+| bootstrap.servers | required | (none) | Comma separated list of Fluss
servers. |
+
+The following statements assume that the current catalog has been switched to
the Fluss catalog using the `USE fluss_catalog` statement.
+
+## Create Database
+
+By default, FlussCatalog will use the `fluss` database. You can use the
following example to create a separate database:
+
+```sql title="Spark SQL"
+CREATE DATABASE my_db COMMENT 'created by spark';
+```
+
+```sql title="Spark SQL"
+USE my_db;
+```
+
+## Drop Database
+
+To delete a database, this will drop all the tables in the database as well:
+
+```sql title="Spark SQL"
+-- Switch to another database first
+USE fluss;
+```
+
+```sql title="Spark SQL"
+-- Drop the database
+DROP DATABASE my_db;
+```
+
+## Create Table
+
+### Log Table
+
+The following SQL statement creates a Log Table by not specifying
`primary.key` property.
+
+```sql title="Spark SQL"
+CREATE TABLE my_log_table (
+ order_id BIGINT,
+ item_id BIGINT,
+ amount INT,
+ address STRING
+);
+```
+
+### Primary Key Table
+
+The following SQL statement creates a Primary Key Table by specifying
`primary.key` in `TBLPROPERTIES`.
+
+```sql title="Spark SQL"
+CREATE TABLE my_pk_table (
+ shop_id BIGINT,
+ user_id BIGINT,
+ num_orders INT,
+ total_amount INT
+) TBLPROPERTIES (
+ 'primary.key' = 'shop_id,user_id',
+ 'bucket.num' = '4'
+);
+```
+
+### Partitioned Table
+
+The following SQL statement creates a Partitioned Log Table in Fluss.
+
+```sql title="Spark SQL"
+CREATE TABLE my_part_log_table (
+ order_id BIGINT,
+ item_id BIGINT,
+ amount INT,
+ address STRING,
+ dt STRING
+) PARTITIONED BY (dt);
+```
+
+The following SQL statement creates a Partitioned Primary Key Table in Fluss.
+
+```sql title="Spark SQL"
+CREATE TABLE my_part_pk_table (
+ id INT,
+ name STRING,
+ pt STRING
+) PARTITIONED BY (pt) TBLPROPERTIES (
+ 'primary.key' = 'id,pt'
+);
+```
+
+:::note
+1. Currently, Fluss only supports partitioned field with `STRING` type.
+2. For the Partitioned Primary Key Table, the partitioned field must be a
subset of the primary key.
+:::
+
+#### Multi-Fields Partitioned Table
+
+Fluss also supports multi-field partitioning. The following SQL statement
creates a Multi-Fields Partitioned Table:
+
+```sql title="Spark SQL"
+CREATE TABLE my_multi_part_table (
+ id INT,
+ name STRING,
+ pt1 STRING,
+ pt2 INT
+) PARTITIONED BY (pt1, pt2);
+```
+
+### Table Properties
+
+The following table properties can be specified when creating a table:
+
+| Property | Required | Description
|
+|--------------|----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| primary.key | optional | The primary keys of the Fluss table. Multiple
columns are separated by commas (e.g., `'col1,col2'`).
|
+| bucket.key | optional | The distribution key of the Fluss table. Data will
be distributed to each bucket according to the hash value of the bucket key.
Must be a subset of the primary keys (excluding partition keys). If not
specified, defaults to the primary key (excluding partition keys) for PK
tables, or random distribution for Log tables. |
+| bucket.num | optional | The number of buckets of the Fluss table.
|
+
+You can also pass additional custom properties and Fluss storage options
through `TBLPROPERTIES`:
+
+```sql title="Spark SQL"
+CREATE TABLE my_table (
+ id INT,
+ name STRING
+) TBLPROPERTIES (
+ 'primary.key' = 'id',
+ 'bucket.num' = '4',
+ 'key1' = 'value1'
+);
+```
+
+## Describe Table
+
+To describe the schema of a table:
+
+```sql title="Spark SQL"
+DESC my_table;
+```
+
+## Show Tables
+
+To list all tables in the current database:
+
+```sql title="Spark SQL"
+SHOW TABLES;
+```
+
+You can also specify a database and use patterns:
+
+```sql title="Spark SQL"
+-- Show tables in a specific database
+SHOW TABLES IN fluss;
+
+-- Show tables matching a pattern
+SHOW TABLES FROM fluss LIKE 'test_*';
+```
+
+## Drop Table
+
+To delete a table, run:
+
+```sql title="Spark SQL"
+DROP TABLE my_table;
+```
+
+This will entirely remove all the data of the table in the Fluss cluster.
+
+## Alter Table
+
+### SET Properties
+
+The `SET TBLPROPERTIES` statement allows users to add or modify table
properties:
+
+```sql title="Spark SQL"
+ALTER TABLE my_table SET TBLPROPERTIES ('key1' = 'value1', 'key2' = 'value2');
+```
+
+### UNSET Properties
+
+The `UNSET TBLPROPERTIES` statement allows users to remove table properties:
+
+```sql title="Spark SQL"
+ALTER TABLE my_table UNSET TBLPROPERTIES ('key1', 'key2');
+```
+
+:::note
+Most table properties with prefix of `table.` are not allowed to be modified.
+:::
+
+## Show Partitions
+
+To show all the partitions of a partitioned table, run:
+
+```sql title="Spark SQL"
+SHOW PARTITIONS my_part_table;
+```
+
+For multi-field partitioned tables, you can use the `SHOW PARTITIONS` command
with a partial partition filter to list matching partitions:
+
+```sql title="Spark SQL"
+-- Show partitions using a partial partition filter
+SHOW PARTITIONS my_multi_part_table PARTITION (pt1 = 'a');
+```
+
+## Add Partition
+
+Fluss supports manually adding partitions to an existing partitioned table:
+
+```sql title="Spark SQL"
+-- Add a partition
+ALTER TABLE my_multi_part_table ADD PARTITION (pt1 = 'b', pt2 = 1);
+
+-- Add a partition if not exists
+ALTER TABLE my_multi_part_table ADD IF NOT EXISTS PARTITION (pt1 = 'b', pt2 =
1);
+```
+
+## Drop Partition
+
+To drop a partition from a partitioned table:
+
+```sql title="Spark SQL"
+-- Drop a partition
+ALTER TABLE my_multi_part_table DROP PARTITION (pt1 = 'a', pt2 = 2);
+
+-- Drop a partition if exists
+ALTER TABLE my_multi_part_table DROP IF EXISTS PARTITION (pt1 = 'a', pt2 = 2);
+```
+
+:::note
+Spark does not support dropping partial partitions. You must specify all
partition fields when dropping a partition.
+:::
diff --git a/website/docs/engine-spark/getting-started.md
b/website/docs/engine-spark/getting-started.md
new file mode 100644
index 000000000..e93fe0c86
--- /dev/null
+++ b/website/docs/engine-spark/getting-started.md
@@ -0,0 +1,207 @@
+---
+sidebar_label: "Getting Started"
+title: "Getting Started with Spark"
+sidebar_position: 1
+---
+
+# Getting Started with Spark Engine
+
+## Supported Spark Versions
+| Fluss Connector Versions | Supported Spark Versions |
+|--------------------------|--------------------------|
+| $FLUSS_VERSION_SHORT$ | 3.4, 3.5 |
+
+
+## Feature Support
+Fluss supports Apache Spark's SQL API and Spark Structured Streaming.
+
+| Feature Support | Spark | Notes
|
+|------------------------------------------------------|-------|---------------------------------------------|
+| [SQL Create Catalog](ddl.md#create-catalog) | ✔️ |
|
+| [SQL Create Database](ddl.md#create-database) | ✔️ |
|
+| [SQL Drop Database](ddl.md#drop-database) | ✔️ |
|
+| [SQL Create Table](ddl.md#create-table) | ✔️ |
|
+| [SQL Drop Table](ddl.md#drop-table) | ✔️ |
|
+| [SQL Describe Table](ddl.md#describe-table) | ✔️ |
|
+| [SQL Show Tables](ddl.md#show-tables) | ✔️ |
|
+| [SQL Alter Table](ddl.md#alter-table) | ✔️ | SET/UNSET
TBLPROPERTIES |
+| [SQL Show Partitions](ddl.md#show-partitions) | ✔️ |
|
+| [SQL Add Partition](ddl.md#add-partition) | ✔️ |
|
+| [SQL Drop Partition](ddl.md#drop-partition) | ✔️ |
|
+| [SQL Select (Batch)](reads.md) | ✔️ | Log table and
primary-key table |
+| [SQL Insert Into](writes.md) | ✔️ | Log table and
primary-key table |
+| [Structured Streaming Read](structured-streaming.md#streaming-read) | ✔️ |
Log table and primary-key table |
+| [Structured Streaming Write](structured-streaming.md#streaming-write) | ✔️ |
Log table and primary-key table |
+
+
+## Preparation when using Spark SQL
+
+- **Download Spark**
+
+Spark runs on all UNIX-like environments, i.e., Linux, Mac OS X. You can
download the binary release of Spark from the [Apache Spark
Downloads](https://spark.apache.org/downloads.html) page, then extract the
archive:
+
+```shell
+tar -xzf spark-3.5.7-bin-hadoop3.tgz
+```
+
+- **Copy Fluss Spark Bundled Jar**
+
+Download [Fluss Spark Bundled jar](/downloads) and copy to the `jars`
directory of your Spark home.
+
+```shell
+cp fluss-spark-3.5_2.12-$FLUSS_VERSION$.jar <SPARK_HOME>/jars/
+```
+
+- **Start Spark SQL**
+
+To quickly start the Spark SQL CLI, you can use the provided script:
+
+```shell
+<SPARK_HOME>/bin/spark-sql \
+ --conf spark.sql.catalog.fluss_catalog=org.apache.fluss.spark.SparkCatalog \
+ --conf spark.sql.catalog.fluss_catalog.bootstrap.servers=localhost:9123 \
+ --conf
spark.sql.extensions=org.apache.fluss.spark.FlussSparkSessionExtensions
+```
+
+Or start Spark Shell:
+
+```shell
+<SPARK_HOME>/bin/spark-shell \
+ --conf spark.sql.catalog.fluss_catalog=org.apache.fluss.spark.SparkCatalog \
+ --conf spark.sql.catalog.fluss_catalog.bootstrap.servers=localhost:9123 \
+ --conf
spark.sql.extensions=org.apache.fluss.spark.FlussSparkSessionExtensions
+```
+
+## Creating a Catalog
+
+The Fluss catalog can be configured in `spark-defaults.conf` or passed as
command-line arguments.
+
+Using `spark-defaults.conf`:
+
+```properties
+spark.sql.catalog.fluss_catalog=org.apache.fluss.spark.SparkCatalog
+spark.sql.catalog.fluss_catalog.bootstrap.servers=localhost:9123
+spark.sql.extensions=org.apache.fluss.spark.FlussSparkSessionExtensions
+```
+
+Or configure programmatically in Scala/Python:
+
+```scala
+val spark = SparkSession.builder()
+ .config("spark.sql.catalog.fluss_catalog",
"org.apache.fluss.spark.SparkCatalog")
+ .config("spark.sql.catalog.fluss_catalog.bootstrap.servers",
"localhost:9123")
+ .config("spark.sql.extensions",
"org.apache.fluss.spark.FlussSparkSessionExtensions")
+ .getOrCreate()
+```
+
+:::note
+1. The `spark.sql.catalog.fluss_catalog.bootstrap.servers` means the Fluss
server address. Before you config the `bootstrap.servers`,
+ you should start the Fluss server first. See [Deploying
Fluss](install-deploy/overview.md#how-to-deploy-fluss)
+ for how to build a Fluss cluster.
+ Here, it is assumed that there is a Fluss cluster running on your local
machine and the CoordinatorServer port is 9123.
+2. The `spark.sql.catalog.fluss_catalog.bootstrap.servers` configuration is
used to discover all nodes within the Fluss cluster. It can be set with one or
more (up to three) Fluss server addresses (either CoordinatorServer or
TabletServer) separated by commas.
+:::
+
+## Creating a Database
+
+```sql title="Spark SQL"
+USE fluss_catalog;
+```
+
+```sql title="Spark SQL"
+CREATE DATABASE fluss_db;
+USE fluss_db;
+```
+
+## Creating a Table
+
+```sql title="Spark SQL"
+CREATE TABLE pk_table (
+ shop_id BIGINT,
+ user_id BIGINT,
+ num_orders INT,
+ total_amount INT
+) TBLPROPERTIES (
+ 'primary.key' = 'shop_id,user_id',
+ 'bucket.num' = '4'
+);
+```
+
+## Data Writing
+
+To append new data to a table, you can use `INSERT INTO`:
+
+```sql title="Spark SQL"
+INSERT INTO pk_table VALUES
+ (1234, 1234, 1, 1),
+ (12345, 12345, 2, 2),
+ (123456, 123456, 3, 3);
+```
+
+## Data Reading
+
+To retrieve data, you can use a `SELECT` statement:
+
+```sql title="Spark SQL"
+SELECT * FROM pk_table ORDER BY shop_id;
+```
+
+To preview a subset of data from a log table with projection and filter:
+
+```sql title="Spark SQL"
+SELECT shop_id, total_amount FROM pk_table WHERE num_orders > 1;
+```
+
+
+## Type Conversion
+
+Fluss's integration for Spark automatically converts between Spark and Fluss
types.
+
+### Fluss -> Apache Spark
+
+| Fluss | Spark |
+|---------------|------------------|
+| BOOLEAN | BooleanType |
+| TINYINT | ByteType |
+| SMALLINT | ShortType |
+| INT | IntegerType |
+| BIGINT | LongType |
+| FLOAT | FloatType |
+| DOUBLE | DoubleType |
+| CHAR | CharType |
+| STRING | StringType |
+| DECIMAL | DecimalType |
+| DATE | DateType |
+| TIMESTAMP | TimestampNTZType |
+| TIMESTAMP_LTZ | TimestampType |
+| BYTES | BinaryType |
+| ARRAY | ArrayType |
+| MAP | MapType |
+| ROW | StructType |
+
+:::note
+The `MAP` type is currently supported for table creation and schema mapping,
but **read and write operations on MAP columns are not yet supported**. Full
MAP type read/write support will be available soon.
+:::
+
+### Apache Spark -> Fluss
+
+| Spark | Fluss |
+|------------------|-----------------------------------------------|
+| BooleanType | BOOLEAN |
+| ByteType | TINYINT |
+| ShortType | SMALLINT |
+| IntegerType | INT |
+| LongType | BIGINT |
+| FloatType | FLOAT |
+| DoubleType | DOUBLE |
+| CharType | CHAR |
+| StringType | STRING |
+| VarcharType | STRING |
+| DecimalType | DECIMAL |
+| DateType | DATE |
+| TimestampType | TIMESTAMP_LTZ |
+| TimestampNTZType | TIMESTAMP |
+| BinaryType | BYTES |
+| ArrayType | ARRAY |
+| MapType | MAP (read/write not yet supported) |
+| StructType | ROW |
diff --git a/website/docs/engine-spark/options.md
b/website/docs/engine-spark/options.md
new file mode 100644
index 000000000..b1f74cbc5
--- /dev/null
+++ b/website/docs/engine-spark/options.md
@@ -0,0 +1,19 @@
+---
+sidebar_label: Connector Options
+title: Spark Connector Options
+sidebar_position: 7
+---
+
+# Spark Connector Options
+
+This page lists all the available options for the Fluss Spark connector.
+
+## Read Options
+
+The following Spark configurations can be used to control read behavior for
both batch and streaming reads. These options are set using `SET` in Spark SQL
or via `spark.conf.set()` in Spark applications. All options are prefixed with
`spark.sql.fluss.`.
+
+| Option | Default | Description |
+|--------|---------|-------------|
+| `spark.sql.fluss.scan.startup.mode` | `full` | The startup mode when reading
a Fluss table. Supported values: <ul><li>`full` (default): For primary key
tables, reads the full snapshot and merges with log changes. For log tables,
reads from the earliest offset.</li><li>`earliest`: Reads from the earliest
log/changelog offset.</li><li>`latest`: Reads from the latest log/changelog
offset.</li><li>`timestamp`: Reads from a specified timestamp (requires
`scan.startup.timestamp`).</li></ul>* [...]
+| `spark.sql.fluss.read.optimized` | `false` | If `true`, Spark will only read
data from the data lake snapshot or KV snapshot, without merging log changes.
This can improve read performance but may return stale data for primary key
tables. |
+| `spark.sql.fluss.scan.poll.timeout` | `10000ms` | The timeout for the log
scanner to poll records. |
diff --git a/website/docs/engine-spark/procedures.md
b/website/docs/engine-spark/procedures.md
index 0d8b01047..ace2d3a6d 100644
--- a/website/docs/engine-spark/procedures.md
+++ b/website/docs/engine-spark/procedures.md
@@ -4,7 +4,7 @@ title: Procedures
sidebar_position: 3
---
-# Procedures
+# Spark Procedures
Fluss provides stored procedures to perform administrative and management
operations through Spark SQL. All procedures are located in the `sys` namespace
and can be invoked using the `CALL` statement.
diff --git a/website/docs/engine-spark/reads.md
b/website/docs/engine-spark/reads.md
new file mode 100644
index 000000000..26cc1c5fd
--- /dev/null
+++ b/website/docs/engine-spark/reads.md
@@ -0,0 +1,259 @@
+---
+sidebar_label: Reads
+title: Spark Reads
+sidebar_position: 5
+---
+
+# Spark Reads
+
+Fluss supports batch read with [Apache Spark](https://spark.apache.org/)'s SQL
API for both Log Tables and Primary Key Tables.
+
+:::tip
+For streaming read, see the [Structured Streaming
Read](structured-streaming.md#streaming-read) section.
+:::
+
+## Limitations
+
+:::caution
+- For tables with datalake enabled (`table.datalake.enabled = true`), batch
read can only read data stored in the Fluss cluster and cannot read data that
has been tiered to the underlying lake storage (e.g., Paimon, Iceberg). Reading
the full dataset including lake data will be supported in a future release.
+:::
+
+## Batch Read
+
+### Log Table
+
+You can read data from a log table using the `SELECT` statement.
+
+#### Example
+
+1. Create a table and prepare data:
+
+```sql title="Spark SQL"
+CREATE TABLE log_table (
+ order_id BIGINT,
+ item_id BIGINT,
+ amount INT,
+ address STRING
+);
+```
+
+```sql title="Spark SQL"
+INSERT INTO log_table VALUES
+ (600, 21, 601, 'addr1'),
+ (700, 22, 602, 'addr2'),
+ (800, 23, 603, 'addr3'),
+ (900, 24, 604, 'addr4'),
+ (1000, 25, 605, 'addr5');
+```
+
+2. Query data:
+
+```sql title="Spark SQL"
+SELECT * FROM log_table ORDER BY order_id;
+```
+
+#### Projection
+
+Column projection minimizes I/O by reading only the columns used in a query:
+
+```sql title="Spark SQL"
+SELECT address, item_id FROM log_table ORDER BY order_id;
+```
+
+#### Filter
+
+Filters are applied to reduce the amount of data read:
+
+```sql title="Spark SQL"
+SELECT * FROM log_table WHERE amount % 2 = 0 ORDER BY order_id;
+```
+
+#### Projection + Filter
+
+Projection and filter can be combined for efficient queries:
+
+```sql title="Spark SQL"
+SELECT order_id, item_id FROM log_table
+WHERE order_id >= 900 ORDER BY order_id;
+```
+
+### Partitioned Log Table
+
+Reading from partitioned log tables supports partition filtering:
+
+```sql title="Spark SQL"
+CREATE TABLE part_log_table (
+ order_id BIGINT,
+ item_id BIGINT,
+ amount INT,
+ address STRING,
+ dt STRING
+) PARTITIONED BY (dt);
+```
+
+```sql title="Spark SQL"
+INSERT INTO part_log_table VALUES
+ (600, 21, 601, 'addr1', '2026-01-01'),
+ (700, 22, 602, 'addr2', '2026-01-01'),
+ (800, 23, 603, 'addr3', '2026-01-02'),
+ (900, 24, 604, 'addr4', '2026-01-02'),
+ (1000, 25, 605, 'addr5', '2026-01-03');
+```
+
+```sql title="Spark SQL"
+-- Read with partition filter
+SELECT * FROM part_log_table WHERE dt = '2026-01-01' ORDER BY order_id;
+```
+
+```sql title="Spark SQL"
+-- Read with multiple partitions filter
+SELECT order_id, address, dt FROM part_log_table
+WHERE dt IN ('2026-01-01', '2026-01-02')
+ORDER BY order_id;
+```
+
+### Primary Key Table
+
+The Fluss source supports batch read for primary-key tables. It reads data
from the latest snapshot and merges it with log changes to provide the most
up-to-date view.
+
+#### Example
+
+1. Create a table and prepare data:
+
+```sql title="Spark SQL"
+CREATE TABLE pk_table (
+ order_id BIGINT,
+ item_id BIGINT,
+ amount INT,
+ address STRING
+) TBLPROPERTIES (
+ 'primary.key' = 'order_id',
+ 'bucket.num' = '1'
+);
+```
+
+```sql title="Spark SQL"
+INSERT INTO pk_table VALUES
+ (600, 21, 601, 'addr1'),
+ (700, 22, 602, 'addr2'),
+ (800, 23, 603, 'addr3'),
+ (900, 24, 604, 'addr4'),
+ (1000, 25, 605, 'addr5');
+```
+
+2. Query data:
+
+```sql title="Spark SQL"
+SELECT * FROM pk_table ORDER BY order_id;
+```
+
+3. After upsert, the query reflects the latest values:
+
+```sql title="Spark SQL"
+-- Upsert data
+INSERT INTO pk_table VALUES
+ (700, 220, 602, 'addr2'),
+ (900, 240, 604, 'addr4'),
+ (1100, 260, 606, 'addr6');
+```
+
+```sql title="Spark SQL"
+-- Query reflects the latest data
+SELECT order_id, item_id, address FROM pk_table
+WHERE amount <= 603 ORDER BY order_id;
+```
+
+### Partitioned Primary Key Table
+
+Reading from partitioned primary key tables also supports partition filtering:
+
+```sql title="Spark SQL"
+CREATE TABLE part_pk_table (
+ order_id BIGINT,
+ item_id BIGINT,
+ amount INT,
+ address STRING,
+ dt STRING
+) PARTITIONED BY (dt) TBLPROPERTIES (
+ 'primary.key' = 'order_id,dt',
+ 'bucket.num' = '1'
+);
+```
+
+```sql title="Spark SQL"
+INSERT INTO part_pk_table VALUES
+ (600, 21, 601, 'addr1', '2026-01-01'),
+ (700, 22, 602, 'addr2', '2026-01-01'),
+ (800, 23, 603, 'addr3', '2026-01-02'),
+ (900, 24, 604, 'addr4', '2026-01-02'),
+ (1000, 25, 605, 'addr5', '2026-01-03');
+```
+
+```sql title="Spark SQL"
+-- Read with partition filter
+SELECT * FROM part_pk_table
+WHERE dt = '2026-01-01'
+ORDER BY order_id;
+```
+
+```sql title="Spark SQL"
+-- Read with multiple partition filters
+SELECT * FROM part_pk_table
+WHERE dt IN ('2026-01-01', '2026-01-02')
+ORDER BY order_id;
+```
+
+## All Data Types
+
+Fluss Spark connector supports reading all Fluss data types including nested
types:
+
+:::note
+The `MAP` type is currently **not supported** for read operations. Full MAP
type read support will be available soon.
+:::
+
+```sql title="Spark SQL"
+CREATE TABLE all_types_table (
+ id INT,
+ flag BOOLEAN,
+ small SHORT,
+ value INT,
+ big BIGINT,
+ real FLOAT,
+ amount DOUBLE,
+ name STRING,
+ decimal_val DECIMAL(10, 2),
+ date_val DATE,
+ timestamp_ntz_val TIMESTAMP,
+ timestamp_ltz_val TIMESTAMP_LTZ,
+ arr ARRAY<INT>,
+ struct_col STRUCT<col1: INT, col2: STRING>
+);
+```
+
+```sql title="Spark SQL"
+INSERT INTO all_types_table VALUES
+ (1, true, 100, 1000, 10000, 12.34, 56.78, 'string_val',
+ 123.45, DATE '2026-01-01', TIMESTAMP '2026-01-01 12:00:00', TIMESTAMP
'2026-01-01 12:00:00',
+ ARRAY(1, 2, 3), STRUCT(100, 'nested_value')),
+ (2, false, 200, 2000, 20000, 23.45, 67.89, 'another_str',
+ 223.45, DATE '2026-01-02', TIMESTAMP '2026-01-02 12:00:00', TIMESTAMP
'2026-01-02 12:00:00',
+ ARRAY(4, 5, 6), STRUCT(200, 'nested_value2'));
+```
+
+```sql title="Spark SQL"
+SELECT * FROM all_types_table;
+```
+
+## Read Optimized Mode
+
+For primary key tables, Fluss by default reads the latest snapshot and merges
it with log changes to return the most up-to-date data. You can enable
**read-optimized mode** to skip the merge step and read only snapshot data,
which improves query performance at the cost of data freshness.
+
+```sql title="Spark SQL"
+-- Enable read-optimized mode for primary key tables
+SET spark.sql.fluss.read.optimized=true;
+
+-- Query returns only snapshot data (may be stale)
+SELECT * FROM pk_table;
+```
+
+For more details on all available read options, see the [Connector
Options](options.md#read-options) page.
diff --git a/website/docs/engine-spark/structured-streaming.md
b/website/docs/engine-spark/structured-streaming.md
new file mode 100644
index 000000000..4fcf2fafd
--- /dev/null
+++ b/website/docs/engine-spark/structured-streaming.md
@@ -0,0 +1,144 @@
+---
+sidebar_label: Structured Streaming
+title: Spark Structured Streaming
+sidebar_position: 6
+---
+
+# Spark Structured Streaming
+
+Fluss supports [Spark Structured
Streaming](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html)
for both reading and writing data in a streaming fashion. This enables
building real-time data pipelines with Apache Spark and Fluss.
+
+## Streaming Write
+
+Fluss supports writing data to Fluss tables using Spark Structured Streaming.
This is useful for continuously ingesting data into Fluss tables from streaming
sources.
+
+### Write to Log Table
+
+```scala title="Spark Application"
+// Here we use MemoryStream to fake a streaming source.
+val inputData = MemoryStream[(Int, String)]
+val inputDF = inputData.toDS().toDF("k", "v")
+
+val query = inputDF.writeStream
+ .option("checkpointLocation", "/path/to/checkpoint")
+ .toTable("fluss_catalog.fluss.log_table")
+
+query.awaitTermination()
+```
+
+### Write to Primary Key Table
+
+When writing to a primary key table, if a record with the same primary key
already exists, it will be updated (upsert semantics):
+
+```scala title="Spark Application"
+// Here we use MemoryStream to fake a streaming source.
+val inputData = MemoryStream[(Int, String)]
+val inputDF = inputData.toDS().toDF("k", "v")
+
+val query = inputDF.writeStream
+ .option("checkpointLocation", "/path/to/checkpoint")
+ .toTable("fluss_catalog.fluss.pk_table")
+
+query.awaitTermination()
+```
+
+:::note
+Fluss supports exactly-once semantics for streaming writes through Spark's
checkpoint mechanism. Make sure to specify a `checkpointLocation` for fault
tolerance.
+:::
+
+## Streaming Read
+
+Fluss supports reading data from Fluss tables using Spark Structured
Streaming. The streaming source continuously reads new data as it arrives.
+
+:::caution Limitations
+- Streaming read currently only supports the `latest` startup mode. Other
modes (`full`, `earliest`, `timestamp`) are not yet supported and will be
available in a future release.
+:::
+
+### Read from Log Table
+
+```scala title="Spark Application"
+val df = spark.readStream
+ .option("scan.startup.mode", "latest")
+ .table("fluss_catalog.fluss.log_table")
+
+val query = df.writeStream
+ .format("console")
+ .start()
+
+query.awaitTermination()
+```
+
+### Read from Primary Key Table
+
+```scala title="Spark Application"
+val df = spark.readStream
+ .option("scan.startup.mode", "latest")
+ .table("fluss_catalog.fluss.pk_table")
+
+val query = df.writeStream
+ .format("console")
+ .start()
+
+query.awaitTermination()
+```
+
+## Trigger Modes
+
+Fluss Spark streaming source supports the following Spark trigger modes:
+
+| Trigger Mode | Description
|
+|-----------------------------------|-----------------------------------------------------------------------------|
+| Default (micro-batch) | Processes data as soon as new data is
available. |
+| `Trigger.ProcessingTime(interval)` | Processes data at fixed time intervals.
|
+
+**Example:**
+
+```scala title="Spark Application"
+import org.apache.spark.sql.streaming.Trigger
+
+val df = spark.readStream
+ .option("scan.startup.mode", "latest")
+ .table("fluss_catalog.fluss.my_table")
+
+// Processing time trigger (every 5 seconds)
+val query = df.writeStream
+ .format("console")
+ .trigger(Trigger.ProcessingTime("5 seconds"))
+ .start()
+
+query.awaitTermination()
+```
+
+## End-to-End Example
+
+Here is a complete example that reads data from one Fluss table and writes to
another:
+
+```scala title="Spark Application"
+import org.apache.spark.sql.SparkSession
+
+val spark = SparkSession.builder()
+ .config("spark.sql.catalog.fluss_catalog",
"org.apache.fluss.spark.SparkCatalog")
+ .config("spark.sql.catalog.fluss_catalog.bootstrap.servers",
"localhost:9123")
+ .config("spark.sql.defaultCatalog", "fluss_catalog")
+ .config("spark.sql.extensions",
"org.apache.fluss.spark.FlussSparkSessionExtensions")
+ .getOrCreate()
+
+// Create source and sink tables
+spark.sql("CREATE TABLE IF NOT EXISTS source_table (id INT, data STRING)")
+spark.sql("""
+ CREATE TABLE IF NOT EXISTS sink_table (id INT, data STRING)
+ TBLPROPERTIES ('primary.key' = 'id')
+""")
+
+// Read from source table
+val sourceDF = spark.readStream
+ .option("scan.startup.mode", "latest")
+ .table("fluss_catalog.fluss.source_table")
+
+// Write to sink table
+val query = sourceDF.writeStream
+ .option("checkpointLocation", "/tmp/fluss-streaming-checkpoint")
+ .toTable("fluss_catalog.fluss.sink_table")
+
+query.awaitTermination()
+```
diff --git a/website/docs/engine-spark/writes.md
b/website/docs/engine-spark/writes.md
new file mode 100644
index 000000000..3036855f9
--- /dev/null
+++ b/website/docs/engine-spark/writes.md
@@ -0,0 +1,115 @@
+---
+sidebar_label: Writes
+title: Spark Writes
+sidebar_position: 4
+---
+
+# Spark Writes
+
+You can directly insert data into a Fluss table using the `INSERT INTO`
statement.
+Fluss primary key tables support upsert semantics (inserting a row with an
existing primary key will update the existing record), while Fluss log tables
only accept append-only writes.
+
+## INSERT INTO
+
+`INSERT INTO` statements are used to write data to Fluss tables in batch mode.
+They are compatible with primary-key tables (for upserting data) as well as
log tables (for appending data).
+
+### Appending Data to the Log Table
+
+#### Create a Log Table
+
+```sql title="Spark SQL"
+CREATE TABLE log_table (
+ order_id BIGINT,
+ item_id BIGINT,
+ amount INT,
+ address STRING
+);
+```
+
+#### Insert Data into the Log Table
+
+```sql title="Spark SQL"
+INSERT INTO log_table VALUES
+ (600, 21, 601, 'addr1'),
+ (700, 22, 602, 'addr2'),
+ (800, 23, 603, 'addr3'),
+ (900, 24, 604, 'addr4'),
+ (1000, 25, 605, 'addr5');
+```
+
+### Perform Data Upserts to the PrimaryKey Table
+
+#### Create a primary key table
+
+```sql title="Spark SQL"
+CREATE TABLE pk_table (
+ order_id BIGINT,
+ item_id BIGINT,
+ amount INT,
+ address STRING
+) TBLPROPERTIES (
+ 'primary.key' = 'order_id',
+ 'bucket.num' = '1'
+);
+```
+
+#### Insert Data
+
+```sql title="Spark SQL"
+INSERT INTO pk_table VALUES
+ (600, 21, 601, 'addr1'),
+ (700, 22, 602, 'addr2'),
+ (800, 23, 603, 'addr3');
+```
+
+#### Upsert Data
+
+When inserting data with the same primary key, the existing record will be
updated:
+
+```sql title="Spark SQL"
+-- This will update the records with order_id 700 and 800
+INSERT INTO pk_table VALUES
+ (700, 220, 602, 'addr2'),
+ (800, 230, 603, 'addr3');
+```
+
+### All Data Types
+
+Fluss Spark connector supports all Fluss data types including nested types.
Here is an example of writing various data types:
+
+:::note
+The `MAP` type is currently **not supported** for write operations. Full MAP
type write support will be available soon.
+:::
+
+```sql title="Spark SQL"
+CREATE TABLE all_types_table (
+ bool_col BOOLEAN,
+ tinyint_col BYTE,
+ smallint_col SHORT,
+ int_col INT,
+ bigint_col BIGINT,
+ float_col FLOAT,
+ double_col DOUBLE,
+ decimal_col DECIMAL(10, 2),
+ decimal_large_col DECIMAL(38, 2),
+ string_col STRING,
+ ts_col TIMESTAMP,
+ array_col ARRAY<FLOAT>,
+ struct_col STRUCT<id: BIGINT, name: STRING>
+);
+```
+
+```sql title="Spark SQL"
+INSERT INTO all_types_table VALUES (
+ true, 1, 10, 100, 1000, 12.3, 45.6,
+ 1234567.89, 12345678900987654321.12,
+ 'test',
+ TIMESTAMP '2025-12-31 10:00:00',
+ array(11.11, 22.22), struct(123, 'apache fluss')
+);
+```
+
+## See Also
+
+- [Structured Streaming Write](structured-streaming.md#streaming-write) for
continuous streaming writes to Fluss tables.
diff --git a/website/docusaurus.config.ts b/website/docusaurus.config.ts
index 959c9e5c8..44a8ed0b6 100644
--- a/website/docusaurus.config.ts
+++ b/website/docusaurus.config.ts
@@ -227,7 +227,7 @@ const config: Config = {
prism: {
theme: prismThemes.vsDark,
darkTheme: prismThemes.dracula,
- additionalLanguages: ['java', 'bash']
+ additionalLanguages: ['java', 'bash', 'scala']
},
algolia: {
appId: "X8KSGGLJW1",