This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 1f9b3ee01 [doc] Document Flink scan remove normalize (#1906)
1f9b3ee01 is described below

commit 1f9b3ee019bec96d6c2149f11c2ecb00ba875f49
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Aug 29 10:41:32 2023 +0800

    [doc] Document Flink scan remove normalize (#1906)
---
 docs/content/concepts/primary-key-table.md                     |  4 +++-
 docs/content/how-to/querying-tables.md                         |  4 ----
 docs/layouts/shortcodes/generated/core_configuration.html      |  6 ------
 .../shortcodes/generated/flink_connector_configuration.html    |  6 ++++++
 paimon-common/src/main/java/org/apache/paimon/CoreOptions.java |  9 ---------
 .../java/org/apache/paimon/flink/FlinkConnectorOptions.java    | 10 ++++++++++
 .../java/org/apache/paimon/flink/source/DataTableSource.java   |  4 ++--
 7 files changed, 21 insertions(+), 22 deletions(-)

diff --git a/docs/content/concepts/primary-key-table.md 
b/docs/content/concepts/primary-key-table.md
index 982af7a30..8a54cfb69 100644
--- a/docs/content/concepts/primary-key-table.md
+++ b/docs/content/concepts/primary-key-table.md
@@ -283,7 +283,9 @@ However, these merged changes cannot form a complete 
changelog, because we can't
 
 Consider a consumer which calculates the sum on some grouping keys (might not 
be equal to the primary keys). If the consumer only sees a new value `5`, it 
cannot determine what values should be added to the summing result. For 
example, if the old value is `4`, it should add `1` to the result. But if the 
old value is `6`, it should in turn subtract `1` from the result. Old values 
are important for these types of consumers.
 
-To conclude, `none` changelog producers are best suited for consumers such as 
a database system. Flink also has a built-in "normalize" operator which 
persists the values of each key in states. As one can easily tell, this 
operator will be very costly and should be avoided.
+To conclude, `none` changelog producers are best suited for consumers such as 
a database system. Flink also has a 
+built-in "normalize" operator which persists the values of each key in states. 
As one can easily tell, this operator
+will be very costly and should be avoided. (You can force removing "normalize" 
operator via `'scan.remove-normalize'`.)
 
 {{< img src="/img/changelog-producer-none.png">}}
 
diff --git a/docs/content/how-to/querying-tables.md 
b/docs/content/how-to/querying-tables.md
index 7fb01b3d2..a22d40e21 100644
--- a/docs/content/how-to/querying-tables.md
+++ b/docs/content/how-to/querying-tables.md
@@ -284,10 +284,6 @@ SELECT * FROM t /*+ 
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'
 
 ### Consumer ID
 
-{{< hint info >}}
-This is an experimental feature.
-{{< /hint >}}
-
 You can specify the `consumer-id` when streaming read table:
 ```sql
 SELECT * FROM t /*+ OPTIONS('consumer-id' = 'myid') */;
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index a1cd016a9..84bf6cb64 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -213,12 +213,6 @@ Mainly to resolve data skew on primary keys. We recommend 
starting with 64 mb wh
             <td>String</td>
             <td>Specify the key message format of log system with primary 
key.</td>
         </tr>
-        <tr>
-            <td><h5>log.scan.remove-normalize</h5></td>
-            <td style="word-wrap: break-word;">false</td>
-            <td>Boolean</td>
-            <td>Whether to force the removal of the normalize node when 
streaming read. Note: This is dangerous and is likely to cause data errors if 
downstream is used to calculate aggregation and the input is not complete 
changelog.</td>
-        </tr>
         <tr>
             <td><h5>lookup.cache-file-retention</h5></td>
             <td style="word-wrap: break-word;">1 h</td>
diff --git 
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html 
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 0a5cbd1cd..876735367 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -80,6 +80,12 @@ under the License.
             <td>Boolean</td>
             <td>If true, flink will push down projection, filters, limit to 
the source. The cost is that it is difficult to reuse the source in a job.</td>
         </tr>
+        <tr>
+            <td><h5>scan.remove-normalize</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Whether to force the removal of the normalize node when 
streaming read. Note: This is dangerous and is likely to cause data errors if 
downstream is used to calculate aggregation and the input is not complete 
changelog.</td>
+        </tr>
         <tr>
             <td><h5>scan.split-enumerator.batch-size</h5></td>
             <td style="word-wrap: break-word;">10</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index c7971eed7..eb954bd73 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -474,15 +474,6 @@ public class CoreOptions implements Serializable {
                     .defaultValue(LogChangelogMode.AUTO)
                     .withDescription("Specify the log changelog mode for 
table.");
 
-    public static final ConfigOption<Boolean> LOG_SCAN_REMOVE_NORMALIZE =
-            key("log.scan.remove-normalize")
-                    .booleanType()
-                    .defaultValue(false)
-                    .withDescription(
-                            "Whether to force the removal of the normalize 
node when streaming read."
-                                    + " Note: This is dangerous and is likely 
to cause data errors if downstream"
-                                    + " is used to calculate aggregation and 
the input is not complete changelog.");
-
     public static final ConfigOption<String> LOG_KEY_FORMAT =
             key("log.key.format")
                     .stringType()
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index 3bf489a42..e8f46f440 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -203,6 +203,16 @@ public class FlinkConnectorOptions {
                             "If true, flink sink will use managed memory for 
merge tree; otherwise, "
                                     + "it will create an independent memory 
allocator.");
 
+    public static final ConfigOption<Boolean> SCAN_REMOVE_NORMALIZE =
+            key("scan.remove-normalize")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDeprecatedKeys("log.scan.remove-normalize")
+                    .withDescription(
+                            "Whether to force the removal of the normalize 
node when streaming read."
+                                    + " Note: This is dangerous and is likely 
to cause data errors if downstream"
+                                    + " is used to calculate aggregation and 
the input is not complete changelog.");
+
     /**
      * Weight of writer buffer in managed memory, Flink will compute the 
memory size for writer
      * according to the weight, the actual memory used depends on the running 
environment.
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
index bcda510f1..10195f1f8 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
@@ -60,9 +60,9 @@ import java.util.stream.IntStream;
 import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
 import static org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE;
 import static org.apache.paimon.CoreOptions.LOG_CONSISTENCY;
-import static org.apache.paimon.CoreOptions.LOG_SCAN_REMOVE_NORMALIZE;
 import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_ASYNC;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_ASYNC_THREAD_NUMBER;
+import static 
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_REMOVE_NORMALIZE;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_GROUP;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_MAX_DRIFT;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_UPDATE_INTERVAL;
@@ -145,7 +145,7 @@ public class DataTableSource extends FlinkTableSource {
                 return ChangelogMode.insertOnly();
             }
 
-            if (options.get(LOG_SCAN_REMOVE_NORMALIZE)) {
+            if (options.get(SCAN_REMOVE_NORMALIZE)) {
                 return ChangelogMode.all();
             }
 

Reply via email to