This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new c04dd90f3 [doc] Introduce dedicate page for bucketed append
c04dd90f3 is described below

commit c04dd90f3990a47f1fe40e65a5e7b448cf7f195b
Author: Jingsong <[email protected]>
AuthorDate: Mon Aug 19 11:36:01 2024 +0800

    [doc] Introduce dedicate page for bucketed append
---
 .../append-table/{streaming.md => bucketed.md}     |  82 +++++------
 docs/content/append-table/streaming.md             | 151 +--------------------
 2 files changed, 38 insertions(+), 195 deletions(-)

diff --git a/docs/content/append-table/streaming.md 
b/docs/content/append-table/bucketed.md
similarity index 76%
copy from docs/content/append-table/streaming.md
copy to docs/content/append-table/bucketed.md
index 3758c7f56..1a2828901 100644
--- a/docs/content/append-table/streaming.md
+++ b/docs/content/append-table/bucketed.md
@@ -1,9 +1,9 @@
 ---
-title: "Streaming"
-weight: 2
+title: "Bucketed"
+weight: 5
 type: docs
 aliases:
-- /append-table/streaming.html
+- /append-table/bucketed.html
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
@@ -24,49 +24,9 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# Streaming
+# Bucketed Append
 
-You can streaming write to the Append table in a very flexible way through 
Flink, or through read the Append table
-Flink, using it like a queue. The only difference is that its latency is in 
minutes. Its advantages are very low cost
-and the ability to push down filters and projection.
-
-## Automatic small file merging
-
-In streaming writing job, without bucket definition, there is no compaction in 
writer, instead, will use
-`Compact Coordinator` to scan the small files and pass compaction task to 
`Compact Worker`. In streaming mode, if you
-run insert sql in flink, the topology will be like this:
-
-{{< img src="/img/unaware-bucket-topo.png">}}
-
-Do not worry about backpressure, compaction never backpressure.
-
-If you set `write-only` to true, the `Compact Coordinator` and `Compact 
Worker` will be removed in the topology.
-
-The auto compaction is only supported in Flink engine streaming mode. You can 
also start a compaction job in flink by
-flink action in paimon and disable all the other compaction by set 
`write-only`.
-
-## Streaming Query
-
-You can stream the Append table and use it like a Message Queue. As with 
primary key tables, there are two options
-for streaming reads:
-1. By default, Streaming read produces the latest snapshot on the table upon 
first startup, and continue to read the
-   latest incremental records.
-2. You can specify `scan.mode` or `scan.snapshot-id` or 
`scan.timestamp-millis` or `scan.file-creation-time-millis` to
-   streaming read incremental only.
-
-Similar to flink-kafka, order is not guaranteed by default, if your data has 
some sort of order requirement, you also
-need to consider defining a `bucket-key`.
-
-## Bucketed Append
-
-An ordinary Append table has no strict ordering guarantees for its streaming 
writes and reads, but there are some cases
-where you need to define a key similar to Kafka's.
-
-You can define the `bucket` and `bucket-key` to get a bucketed append table. 
Every record in the same bucket is ordered
-strictly, streaming read will transfer the record to down-stream exactly in 
the order of writing. To use this mode, you
-do not need to config special configurations, all the data will go into one 
bucket as a queue.
-
-{{< img src="/img/for-queue.png">}}
+You can define the `bucket` and `bucket-key` to get a bucketed append table.
 
 Example to create bucketed append table:
 
@@ -86,6 +46,17 @@ CREATE TABLE my_table (
 {{< /tab >}}
 {{< /tabs >}}
 
+## Streaming
+
+An ordinary Append table has no strict ordering guarantees for its streaming 
writes and reads, but there are some cases
+where you need to define a key similar to Kafka's.
+
+Every record in the same bucket is ordered strictly, streaming read will 
transfer the record to down-stream exactly in
+the order of writing. To use this mode, you do not need to config special 
configurations, all the data will go into one
+bucket as a queue.
+
+{{< img src="/img/for-queue.png">}}
+
 ### Compaction in Bucket
 
 By default, the sink node will automatically perform compaction to control the 
number of files. The following options
@@ -205,3 +176,24 @@ INSERT INTO paimon_table SELECT * FROM kakfa_table;
 -- launch a bounded streaming job to read paimon_table
 SELECT * FROM paimon_table /*+ OPTIONS('scan.bounded.watermark'='...') */;
 ```
+
+## Batch
+
+Bucketed table can be used to avoid shuffle if necessary in batch query, for 
example, you can use the following Spark
+SQL to read a Paimon table:
+
+```sql
+SET spark.sql.sources.v2.bucketing.enabled = true;
+
+CREATE TABLE FACT_TABLE (order_id INT, f1 STRING) TBLPROPERTIES 
('bucket'='10', 'bucket-key' = 'order_id');
+
+CREATE TABLE DIM_TABLE (order_id INT, f2 STRING) TBLPROPERTIES ('bucket'='10', 
'primary-key' = 'order_id');
+
+SELECT * FROM FACT_TABLE JOIN DIM_TABLE on t1.order_id = t4.order_id;
+```
+
+The `spark.sql.sources.v2.bucketing.enabled` config is used to enable 
bucketing for V2 data sources. When turned on,
+Spark will recognize the specific distribution reported by a V2 data source 
through SupportsReportPartitioning, and
+will try to avoid shuffle if necessary.
+
+The costly join shuffle will be avoided if two tables have same bucketing 
strategy and same number of buckets.
diff --git a/docs/content/append-table/streaming.md 
b/docs/content/append-table/streaming.md
index 3758c7f56..acafb5b02 100644
--- a/docs/content/append-table/streaming.md
+++ b/docs/content/append-table/streaming.md
@@ -55,153 +55,4 @@ for streaming reads:
    streaming read incremental only.
 
 Similar to flink-kafka, order is not guaranteed by default, if your data has 
some sort of order requirement, you also
-need to consider defining a `bucket-key`.
-
-## Bucketed Append
-
-An ordinary Append table has no strict ordering guarantees for its streaming 
writes and reads, but there are some cases
-where you need to define a key similar to Kafka's.
-
-You can define the `bucket` and `bucket-key` to get a bucketed append table. 
Every record in the same bucket is ordered
-strictly, streaming read will transfer the record to down-stream exactly in 
the order of writing. To use this mode, you
-do not need to config special configurations, all the data will go into one 
bucket as a queue.
-
-{{< img src="/img/for-queue.png">}}
-
-Example to create bucketed append table:
-
-{{< tabs "create-bucketed-append" >}}
-{{< tab "Flink" >}}
-
-```sql
-CREATE TABLE my_table (
-    product_id BIGINT,
-    price DOUBLE,
-    sales BIGINT
-) WITH (
-    'bucket' = '8',
-    'bucket-key' = 'product_id'
-);
-```
-{{< /tab >}}
-{{< /tabs >}}
-
-### Compaction in Bucket
-
-By default, the sink node will automatically perform compaction to control the 
number of files. The following options
-control the strategy of compaction:
-
-<table class="configuration table table-bordered">
-    <thead>
-        <tr>
-            <th class="text-left" style="width: 20%">Key</th>
-            <th class="text-left" style="width: 15%">Default</th>
-            <th class="text-left" style="width: 10%">Type</th>
-            <th class="text-left" style="width: 55%">Description</th>
-        </tr>
-    </thead>
-    <tbody>
-        <tr>
-            <td><h5>write-only</h5></td>
-            <td style="word-wrap: break-word;">false</td>
-            <td>Boolean</td>
-            <td>If set to true, compactions and snapshot expiration will be 
skipped. This option is used along with dedicated compact jobs.</td>
-        </tr>
-        <tr>
-            <td><h5>compaction.min.file-num</h5></td>
-            <td style="word-wrap: break-word;">5</td>
-            <td>Integer</td>
-            <td>For file set [f_0,...,f_N], the minimum file number which 
satisfies sum(size(f_i)) &gt;= targetFileSize to trigger a compaction for 
append table. This value avoids almost-full-file to be compacted, which is not 
cost-effective.</td>
-        </tr>
-        <tr>
-            <td><h5>compaction.max.file-num</h5></td>
-            <td style="word-wrap: break-word;">5</td>
-            <td>Integer</td>
-            <td>For file set [f_0,...,f_N], the maximum file number to trigger 
a compaction for append table, even if sum(size(f_i)) &lt; targetFileSize. This 
value avoids pending too much small files, which slows down the 
performance.</td>
-        </tr>
-        <tr>
-            <td><h5>full-compaction.delta-commits</h5></td>
-            <td style="word-wrap: break-word;">(none)</td>
-            <td>Integer</td>
-            <td>Full compaction will be constantly triggered after delta 
commits.</td>
-        </tr>
-    </tbody>
-</table>
-
-### Streaming Read Order
-
-For streaming reads, records are produced in the following order:
-
-* For any two records from two different partitions
-   * If `scan.plan-sort-partition` is set to true, the record with a smaller 
partition value will be produced first.
-   * Otherwise, the record with an earlier partition creation time will be 
produced first.
-* For any two records from the same partition and the same bucket, the first 
written record will be produced first.
-* For any two records from the same partition but two different buckets, 
different buckets are processed by different tasks, there is no order guarantee 
between them.
-
-### Watermark Definition
-
-You can define watermark for reading Paimon tables:
-
-```sql
-CREATE TABLE t (
-    `user` BIGINT,
-    product STRING,
-    order_time TIMESTAMP(3),
-    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
-) WITH (...);
-
--- launch a bounded streaming job to read paimon_table
-SELECT window_start, window_end, COUNT(`user`) FROM TABLE(
- TUMBLE(TABLE t, DESCRIPTOR(order_time), INTERVAL '10' MINUTES)) GROUP BY 
window_start, window_end;
-```
-
-You can also enable [Flink Watermark 
alignment](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/event-time/generating_watermarks/#watermark-alignment-_beta_),
-which will make sure no sources/splits/shards/partitions increase their 
watermarks too far ahead of the rest:
-
-<table class="configuration table table-bordered">
-    <thead>
-        <tr>
-            <th class="text-left" style="width: 20%">Key</th>
-            <th class="text-left" style="width: 15%">Default</th>
-            <th class="text-left" style="width: 10%">Type</th>
-            <th class="text-left" style="width: 55%">Description</th>
-        </tr>
-    </thead>
-    <tbody>
-        <tr>
-            <td><h5>scan.watermark.alignment.group</h5></td>
-            <td style="word-wrap: break-word;">(none)</td>
-            <td>String</td>
-            <td>A group of sources to align watermarks.</td>
-        </tr>
-        <tr>
-            <td><h5>scan.watermark.alignment.max-drift</h5></td>
-            <td style="word-wrap: break-word;">(none)</td>
-            <td>Duration</td>
-            <td>Maximal drift to align watermarks, before we pause consuming 
from the source/task/partition.</td>
-        </tr>
-    </tbody>
-</table>
-
-### Bounded Stream
-
-Streaming Source can also be bounded, you can specify 'scan.bounded.watermark' 
to define the end condition for bounded streaming mode, stream reading will end 
until a larger watermark snapshot is encountered.
-
-Watermark in snapshot is generated by writer, for example, you can specify a 
kafka source and declare the definition of watermark.
-When using this kafka source to write to Paimon table, the snapshots of Paimon 
table will generate the corresponding watermark,
-so that you can use the feature of bounded watermark when streaming reads of 
this Paimon table.
-
-```sql
-CREATE TABLE kafka_table (
-    `user` BIGINT,
-    product STRING,
-    order_time TIMESTAMP(3),
-    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
-) WITH ('connector' = 'kafka'...);
-
--- launch a streaming insert job
-INSERT INTO paimon_table SELECT * FROM kakfa_table;
-
--- launch a bounded streaming job to read paimon_table
-SELECT * FROM paimon_table /*+ OPTIONS('scan.bounded.watermark'='...') */;
-```
+need to consider defining a `bucket-key`, see [Bucketed Append]({{< ref 
"append-table/bucketed" >}})

Reply via email to