This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new be5fc4a3 [FLINK-28712] Default Changelog all when changelog producer 
is input
be5fc4a3 is described below

commit be5fc4a3e54d56b2a7d2fa67781474763ea8fe2e
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Jul 28 12:51:09 2022 +0800

    [FLINK-28712] Default Changelog all when changelog producer is input
    
    This closes #248
---
 docs/content/docs/development/streaming-query.md   |  30 +++++-
 docs/static/img/changelog-producer-input.png       | Bin 0 -> 318448 bytes
 docs/static/img/changelog-producer-none.png        | Bin 0 -> 297802 bytes
 .../table/store/connector/sink/TableStoreSink.java |  32 +++---
 .../store/connector/source/TableStoreSource.java   |  10 +-
 .../table/store/connector/ChangelogModeTest.java   | 115 +++++++++++++++++++++
 6 files changed, 165 insertions(+), 22 deletions(-)

diff --git a/docs/content/docs/development/streaming-query.md 
b/docs/content/docs/development/streaming-query.md
index 99b77fb6..a714ee56 100644
--- a/docs/content/docs/development/streaming-query.md
+++ b/docs/content/docs/development/streaming-query.md
@@ -75,17 +75,39 @@ streaming mode. This mode has a lower cost compared to 
Kafka but has a higher la
 depending on the checkpoint interval of the writing job.
 
 By default, the downstream streaming consumption is disordered (ordered within 
the key)
-stream of upsert data. If you expect an ordered CDC data stream, you can 
configure it
-as follows (recommended):
+stream of upsert data. If you expect an ordered CDC data stream, and remove 
downstream
+changelog normalized operator (which is costly), you can configure it as 
follows
+(Recommended, but this requires that your input is inclusive of all 
changelogs):
 
 ```sql
 CREATE TABLE T (...)
 WITH (
-    'changelog-producer' = 'input',
-    'log.changelog-mode' = 'all'
+    'changelog-producer' = 'input'
 )
 ```
 
+You can understand the difference between changelog-producer(none) and 
changelog-producer(input) by the following pictures:
+
+{{< img src="/img/changelog-producer-none.png">}}
+
+When the changelog-producer is none, because the storage only retains the 
upsert data and
+does not have the full changelog data containing update_before, so the 
downstream consumption job needs
+to use the normalized node to generate the complete changelog.
+
+{{< hint info >}}
+__Note:__ The normalized node needs to persist all the data into the state, 
which is very costly.
+{{< /hint >}}
+
+{{< img src="/img/changelog-producer-input.png">}}
+
+When the changelog-producer is input, the storage trusts input data is a 
complete changelog
+so that downstream consumption can also read the complete changelog.
+
+{{< hint info >}}
+__Note:__ You need to ensure that the input is a complete changelog, such as 
from a Database CDC,
+or generated by a Flink stateful computation.
+{{< /hint >}}
+
 ## Streaming Query on Kafka
 
 For a table configuring a log system like Kafka, data will be double written 
to the file
diff --git a/docs/static/img/changelog-producer-input.png 
b/docs/static/img/changelog-producer-input.png
new file mode 100644
index 00000000..e7cbec15
Binary files /dev/null and b/docs/static/img/changelog-producer-input.png differ
diff --git a/docs/static/img/changelog-producer-none.png 
b/docs/static/img/changelog-producer-none.png
new file mode 100644
index 00000000..78d87f1f
Binary files /dev/null and b/docs/static/img/changelog-producer-none.png differ
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
index b0062a35..bb26ea36 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
@@ -20,13 +20,13 @@ package org.apache.flink.table.store.connector.sink;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
 import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
 import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.store.CoreOptions.ChangelogProducer;
 import org.apache.flink.table.store.CoreOptions.LogChangelogMode;
 import org.apache.flink.table.store.connector.FlinkConnectorOptions;
 import org.apache.flink.table.store.connector.TableStoreDataStreamSinkProvider;
@@ -45,6 +45,7 @@ import javax.annotation.Nullable;
 import java.util.HashMap;
 import java.util.Map;
 
+import static org.apache.flink.table.store.CoreOptions.CHANGELOG_PRODUCER;
 import static org.apache.flink.table.store.CoreOptions.LOG_CHANGELOG_MODE;
 
 /** Table sink to create {@link StoreSink}. */
@@ -79,25 +80,22 @@ public class TableStoreSink implements DynamicTableSink, 
SupportsOverwrite, Supp
             return requestedMode;
         } else if (table instanceof ChangelogWithKeyFileStoreTable) {
             Configuration options = 
Configuration.fromMap(table.schema().options());
-            if (options.get(LOG_CHANGELOG_MODE) != LogChangelogMode.ALL) {
-                // with primary key, default sink upsert
-                ChangelogMode.Builder builder = ChangelogMode.newBuilder();
-                for (RowKind kind : requestedMode.getContainedKinds()) {
-                    if (kind != RowKind.UPDATE_BEFORE) {
-                        builder.addContainedKind(kind);
-                    }
-                }
-                return builder.build();
+            if (options.get(CHANGELOG_PRODUCER) == ChangelogProducer.INPUT) {
+                return requestedMode;
             }
 
-            // all changelog mode configured
-            if (!requestedMode.contains(RowKind.UPDATE_BEFORE)
-                    || !requestedMode.contains(RowKind.UPDATE_AFTER)) {
-                throw new ValidationException(
-                        "You cannot insert incomplete data into a table that "
-                                + "has primary key and declares all changelog 
mode.");
+            if (options.get(LOG_CHANGELOG_MODE) == LogChangelogMode.ALL) {
+                return requestedMode;
             }
-            return requestedMode;
+
+            // with primary key, default sink upsert
+            ChangelogMode.Builder builder = ChangelogMode.newBuilder();
+            for (RowKind kind : requestedMode.getContainedKinds()) {
+                if (kind != RowKind.UPDATE_BEFORE) {
+                    builder.addContainedKind(kind);
+                }
+            }
+            return builder.build();
         } else {
             throw new UnsupportedOperationException(
                     "Unknown FileStoreTable subclass " + 
table.getClass().getName());
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
index b173f1ba..8914e93c 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
@@ -30,6 +30,7 @@ import 
org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDo
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.expressions.ResolvedExpression;
 import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.store.CoreOptions.ChangelogProducer;
 import org.apache.flink.table.store.CoreOptions.LogChangelogMode;
 import org.apache.flink.table.store.CoreOptions.LogConsistency;
 import org.apache.flink.table.store.connector.FlinkConnectorOptions;
@@ -50,6 +51,7 @@ import javax.annotation.Nullable;
 import java.util.ArrayList;
 import java.util.List;
 
+import static org.apache.flink.table.store.CoreOptions.CHANGELOG_PRODUCER;
 import static org.apache.flink.table.store.CoreOptions.LOG_CHANGELOG_MODE;
 import static org.apache.flink.table.store.CoreOptions.LOG_CONSISTENCY;
 
@@ -116,9 +118,15 @@ public class TableStoreSource
         } else if (table instanceof ChangelogValueCountFileStoreTable) {
             return ChangelogMode.all();
         } else if (table instanceof ChangelogWithKeyFileStoreTable) {
+            Configuration options = 
Configuration.fromMap(table.schema().options());
+
+            if (logStoreTableFactory == null
+                    && options.get(CHANGELOG_PRODUCER) != 
ChangelogProducer.NONE) {
+                return ChangelogMode.all();
+            }
+
             // optimization: transaction consistency and all changelog mode 
avoid the generation of
             // normalized nodes. See TableStoreSink.getChangelogMode 
validation.
-            Configuration options = 
Configuration.fromMap(table.schema().options());
             return options.get(LOG_CONSISTENCY) == LogConsistency.TRANSACTIONAL
                             && options.get(LOG_CHANGELOG_MODE) == 
LogChangelogMode.ALL
                     ? ChangelogMode.all()
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ChangelogModeTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ChangelogModeTest.java
new file mode 100644
index 00000000..8f461ef9
--- /dev/null
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ChangelogModeTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.connector.sink.TableStoreSink;
+import org.apache.flink.table.store.connector.source.TableStoreSource;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
+import org.apache.flink.table.store.kafka.KafkaLogStoreFactory;
+import org.apache.flink.table.store.log.LogStoreTableFactory;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.FileStoreTableFactory;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for changelog mode with flink source and sink. */
+public class ChangelogModeTest {
+
+    @TempDir java.nio.file.Path temp;
+
+    private final ObjectIdentifier identifier = ObjectIdentifier.of("c", "d", 
"t");
+
+    private Path path;
+
+    @BeforeEach
+    public void beforeEach() {
+        path = new Path(temp.toUri().toString());
+    }
+
+    private void test(Configuration options, ChangelogMode expectSource, 
ChangelogMode expectSink)
+            throws Exception {
+        test(options, expectSource, expectSink, null);
+    }
+
+    private void test(
+            Configuration options,
+            ChangelogMode expectSource,
+            ChangelogMode expectSink,
+            @Nullable LogStoreTableFactory logStoreTableFactory)
+            throws Exception {
+        new SchemaManager(path)
+                .commitNewVersion(
+                        new UpdateSchema(
+                                RowType.of(new IntType(), new IntType()),
+                                Collections.emptyList(),
+                                Collections.singletonList("f0"),
+                                options.toMap(),
+                                ""));
+        FileStoreTable table = FileStoreTableFactory.create(path);
+
+        TableStoreSource source =
+                new TableStoreSource(identifier, table, true, null, 
logStoreTableFactory);
+        assertThat(source.getChangelogMode()).isEqualTo(expectSource);
+
+        TableStoreSink sink = new TableStoreSink(identifier, table, null, 
null);
+        
assertThat(sink.getChangelogMode(ChangelogMode.all())).isEqualTo(expectSink);
+    }
+
+    @Test
+    public void testDefault() throws Exception {
+        test(new Configuration(), ChangelogMode.upsert(), 
ChangelogMode.upsert());
+    }
+
+    @Test
+    public void testInputChangelogProducer() throws Exception {
+        Configuration options = new Configuration();
+        options.set(CoreOptions.CHANGELOG_PRODUCER, 
CoreOptions.ChangelogProducer.INPUT);
+        test(options, ChangelogMode.all(), ChangelogMode.all());
+    }
+
+    @Test
+    public void testChangelogModeAll() throws Exception {
+        Configuration options = new Configuration();
+        options.set(CoreOptions.LOG_CHANGELOG_MODE, 
CoreOptions.LogChangelogMode.ALL);
+        test(options, ChangelogMode.all(), ChangelogMode.all());
+    }
+
+    @Test
+    public void testInputChangelogProducerWithLog() throws Exception {
+        Configuration options = new Configuration();
+        options.set(CoreOptions.CHANGELOG_PRODUCER, 
CoreOptions.ChangelogProducer.INPUT);
+        test(options, ChangelogMode.upsert(), ChangelogMode.all(), new 
KafkaLogStoreFactory());
+    }
+}

Reply via email to