This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 1f9b3ee01 [doc] Document Flink scan remove normalize (#1906)
1f9b3ee01 is described below
commit 1f9b3ee019bec96d6c2149f11c2ecb00ba875f49
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Aug 29 10:41:32 2023 +0800
[doc] Document Flink scan remove normalize (#1906)
---
docs/content/concepts/primary-key-table.md | 4 +++-
docs/content/how-to/querying-tables.md | 4 ----
docs/layouts/shortcodes/generated/core_configuration.html | 6 ------
.../shortcodes/generated/flink_connector_configuration.html | 6 ++++++
paimon-common/src/main/java/org/apache/paimon/CoreOptions.java | 9 ---------
.../java/org/apache/paimon/flink/FlinkConnectorOptions.java | 10 ++++++++++
.../java/org/apache/paimon/flink/source/DataTableSource.java | 4 ++--
7 files changed, 21 insertions(+), 22 deletions(-)
diff --git a/docs/content/concepts/primary-key-table.md
b/docs/content/concepts/primary-key-table.md
index 982af7a30..8a54cfb69 100644
--- a/docs/content/concepts/primary-key-table.md
+++ b/docs/content/concepts/primary-key-table.md
@@ -283,7 +283,9 @@ However, these merged changes cannot form a complete
changelog, because we can't
Consider a consumer which calculates the sum on some grouping keys (might not
be equal to the primary keys). If the consumer only sees a new value `5`, it
cannot determine what values should be added to the summing result. For
example, if the old value is `4`, it should add `1` to the result. But if the
old value is `6`, it should in turn subtract `1` from the result. Old values
are important for these types of consumers.
-To conclude, `none` changelog producers are best suited for consumers such as
a database system. Flink also has a built-in "normalize" operator which
persists the values of each key in states. As one can easily tell, this
operator will be very costly and should be avoided.
+To conclude, `none` changelog producers are best suited for consumers such as
a database system. Flink also has a
+built-in "normalize" operator which persists the values of each key in states.
As one can easily tell, this operator
+will be very costly and should be avoided. (You can force removing "normalize"
operator via `'scan.remove-normalize'`.)
{{< img src="/img/changelog-producer-none.png">}}
diff --git a/docs/content/how-to/querying-tables.md
b/docs/content/how-to/querying-tables.md
index 7fb01b3d2..a22d40e21 100644
--- a/docs/content/how-to/querying-tables.md
+++ b/docs/content/how-to/querying-tables.md
@@ -284,10 +284,6 @@ SELECT * FROM t /*+
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id'
### Consumer ID
-{{< hint info >}}
-This is an experimental feature.
-{{< /hint >}}
-
You can specify the `consumer-id` when streaming read table:
```sql
SELECT * FROM t /*+ OPTIONS('consumer-id' = 'myid') */;
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index a1cd016a9..84bf6cb64 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -213,12 +213,6 @@ Mainly to resolve data skew on primary keys. We recommend
starting with 64 mb wh
<td>String</td>
<td>Specify the key message format of log system with primary
key.</td>
</tr>
- <tr>
- <td><h5>log.scan.remove-normalize</h5></td>
- <td style="word-wrap: break-word;">false</td>
- <td>Boolean</td>
- <td>Whether to force the removal of the normalize node when
streaming read. Note: This is dangerous and is likely to cause data errors if
downstream is used to calculate aggregation and the input is not complete
changelog.</td>
- </tr>
<tr>
<td><h5>lookup.cache-file-retention</h5></td>
<td style="word-wrap: break-word;">1 h</td>
diff --git
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 0a5cbd1cd..876735367 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -80,6 +80,12 @@ under the License.
<td>Boolean</td>
<td>If true, flink will push down projection, filters, limit to
the source. The cost is that it is difficult to reuse the source in a job.</td>
</tr>
+ <tr>
+ <td><h5>scan.remove-normalize</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Whether to force the removal of the normalize node when
streaming read. Note: This is dangerous and is likely to cause data errors if
downstream is used to calculate aggregation and the input is not complete
changelog.</td>
+ </tr>
<tr>
<td><h5>scan.split-enumerator.batch-size</h5></td>
<td style="word-wrap: break-word;">10</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index c7971eed7..eb954bd73 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -474,15 +474,6 @@ public class CoreOptions implements Serializable {
.defaultValue(LogChangelogMode.AUTO)
.withDescription("Specify the log changelog mode for
table.");
- public static final ConfigOption<Boolean> LOG_SCAN_REMOVE_NORMALIZE =
- key("log.scan.remove-normalize")
- .booleanType()
- .defaultValue(false)
- .withDescription(
- "Whether to force the removal of the normalize
node when streaming read."
- + " Note: This is dangerous and is likely
to cause data errors if downstream"
- + " is used to calculate aggregation and
the input is not complete changelog.");
-
public static final ConfigOption<String> LOG_KEY_FORMAT =
key("log.key.format")
.stringType()
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index 3bf489a42..e8f46f440 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -203,6 +203,16 @@ public class FlinkConnectorOptions {
"If true, flink sink will use managed memory for
merge tree; otherwise, "
+ "it will create an independent memory
allocator.");
+ public static final ConfigOption<Boolean> SCAN_REMOVE_NORMALIZE =
+ key("scan.remove-normalize")
+ .booleanType()
+ .defaultValue(false)
+ .withDeprecatedKeys("log.scan.remove-normalize")
+ .withDescription(
+ "Whether to force the removal of the normalize
node when streaming read."
+ + " Note: This is dangerous and is likely
to cause data errors if downstream"
+ + " is used to calculate aggregation and
the input is not complete changelog.");
+
/**
* Weight of writer buffer in managed memory, Flink will compute the
memory size for writer
* according to the weight, the actual memory used depends on the running
environment.
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
index bcda510f1..10195f1f8 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
@@ -60,9 +60,9 @@ import java.util.stream.IntStream;
import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
import static org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE;
import static org.apache.paimon.CoreOptions.LOG_CONSISTENCY;
-import static org.apache.paimon.CoreOptions.LOG_SCAN_REMOVE_NORMALIZE;
import static org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_ASYNC;
import static
org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_ASYNC_THREAD_NUMBER;
+import static
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_REMOVE_NORMALIZE;
import static
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_GROUP;
import static
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_MAX_DRIFT;
import static
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_UPDATE_INTERVAL;
@@ -145,7 +145,7 @@ public class DataTableSource extends FlinkTableSource {
return ChangelogMode.insertOnly();
}
- if (options.get(LOG_SCAN_REMOVE_NORMALIZE)) {
+ if (options.get(SCAN_REMOVE_NORMALIZE)) {
return ChangelogMode.all();
}