This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new be5fc4a3 [FLINK-28712] Default Changelog all when changelog producer
is input
be5fc4a3 is described below
commit be5fc4a3e54d56b2a7d2fa67781474763ea8fe2e
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Jul 28 12:51:09 2022 +0800
[FLINK-28712] Default Changelog all when changelog producer is input
This closes #248
---
docs/content/docs/development/streaming-query.md | 30 +++++-
docs/static/img/changelog-producer-input.png | Bin 0 -> 318448 bytes
docs/static/img/changelog-producer-none.png | Bin 0 -> 297802 bytes
.../table/store/connector/sink/TableStoreSink.java | 32 +++---
.../store/connector/source/TableStoreSource.java | 10 +-
.../table/store/connector/ChangelogModeTest.java | 115 +++++++++++++++++++++
6 files changed, 165 insertions(+), 22 deletions(-)
diff --git a/docs/content/docs/development/streaming-query.md
b/docs/content/docs/development/streaming-query.md
index 99b77fb6..a714ee56 100644
--- a/docs/content/docs/development/streaming-query.md
+++ b/docs/content/docs/development/streaming-query.md
@@ -75,17 +75,39 @@ streaming mode. This mode has a lower cost compared to
Kafka but has a higher la
depending on the checkpoint interval of the writing job.
By default, the downstream streaming consumption is disordered (ordered within
the key)
-stream of upsert data. If you expect an ordered CDC data stream, you can
configure it
-as follows (recommended):
+stream of upsert data. If you expect an ordered CDC data stream, and remove
downstream
+changelog normalized operator (which is costly), you can configure it as
follows
+(Recommended, but this requires that your input is inclusive of all
changelogs):
```sql
CREATE TABLE T (...)
WITH (
- 'changelog-producer' = 'input',
- 'log.changelog-mode' = 'all'
+ 'changelog-producer' = 'input'
)
```
+You can understand the difference between changelog-producer(none) and
changelog-producer(input) by the following pictures:
+
+{{< img src="/img/changelog-producer-none.png">}}
+
+When the changelog-producer is none, because the storage only retains the
upsert data and
+does not have the full changelog data containing update_before, so the
downstream consumption job needs
+to use the normalized node to generate the complete changelog.
+
+{{< hint info >}}
+__Note:__ The normalized node needs to persist all the data into the state,
which is very costly.
+{{< /hint >}}
+
+{{< img src="/img/changelog-producer-input.png">}}
+
+When the changelog-producer is input, the storage trusts input data is a
complete changelog
+so that downstream consumption can also read the complete changelog.
+
+{{< hint info >}}
+__Note:__ You need to ensure that the input is a complete changelog, such as
from a Database CDC,
+or generated by a Flink stateful computation.
+{{< /hint >}}
+
## Streaming Query on Kafka
For a table configuring a log system like Kafka, data will be double written
to the file
diff --git a/docs/static/img/changelog-producer-input.png
b/docs/static/img/changelog-producer-input.png
new file mode 100644
index 00000000..e7cbec15
Binary files /dev/null and b/docs/static/img/changelog-producer-input.png differ
diff --git a/docs/static/img/changelog-producer-none.png
b/docs/static/img/changelog-producer-none.png
new file mode 100644
index 00000000..78d87f1f
Binary files /dev/null and b/docs/static/img/changelog-producer-none.png differ
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
index b0062a35..bb26ea36 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/TableStoreSink.java
@@ -20,13 +20,13 @@ package org.apache.flink.table.store.connector.sink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.store.CoreOptions.ChangelogProducer;
import org.apache.flink.table.store.CoreOptions.LogChangelogMode;
import org.apache.flink.table.store.connector.FlinkConnectorOptions;
import org.apache.flink.table.store.connector.TableStoreDataStreamSinkProvider;
@@ -45,6 +45,7 @@ import javax.annotation.Nullable;
import java.util.HashMap;
import java.util.Map;
+import static org.apache.flink.table.store.CoreOptions.CHANGELOG_PRODUCER;
import static org.apache.flink.table.store.CoreOptions.LOG_CHANGELOG_MODE;
/** Table sink to create {@link StoreSink}. */
@@ -79,25 +80,22 @@ public class TableStoreSink implements DynamicTableSink,
SupportsOverwrite, Supp
return requestedMode;
} else if (table instanceof ChangelogWithKeyFileStoreTable) {
Configuration options =
Configuration.fromMap(table.schema().options());
- if (options.get(LOG_CHANGELOG_MODE) != LogChangelogMode.ALL) {
- // with primary key, default sink upsert
- ChangelogMode.Builder builder = ChangelogMode.newBuilder();
- for (RowKind kind : requestedMode.getContainedKinds()) {
- if (kind != RowKind.UPDATE_BEFORE) {
- builder.addContainedKind(kind);
- }
- }
- return builder.build();
+ if (options.get(CHANGELOG_PRODUCER) == ChangelogProducer.INPUT) {
+ return requestedMode;
}
- // all changelog mode configured
- if (!requestedMode.contains(RowKind.UPDATE_BEFORE)
- || !requestedMode.contains(RowKind.UPDATE_AFTER)) {
- throw new ValidationException(
- "You cannot insert incomplete data into a table that "
- + "has primary key and declares all changelog
mode.");
+ if (options.get(LOG_CHANGELOG_MODE) == LogChangelogMode.ALL) {
+ return requestedMode;
}
- return requestedMode;
+
+ // with primary key, default sink upsert
+ ChangelogMode.Builder builder = ChangelogMode.newBuilder();
+ for (RowKind kind : requestedMode.getContainedKinds()) {
+ if (kind != RowKind.UPDATE_BEFORE) {
+ builder.addContainedKind(kind);
+ }
+ }
+ return builder.build();
} else {
throw new UnsupportedOperationException(
"Unknown FileStoreTable subclass " +
table.getClass().getName());
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
index b173f1ba..8914e93c 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/source/TableStoreSource.java
@@ -30,6 +30,7 @@ import
org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDo
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.store.CoreOptions.ChangelogProducer;
import org.apache.flink.table.store.CoreOptions.LogChangelogMode;
import org.apache.flink.table.store.CoreOptions.LogConsistency;
import org.apache.flink.table.store.connector.FlinkConnectorOptions;
@@ -50,6 +51,7 @@ import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
+import static org.apache.flink.table.store.CoreOptions.CHANGELOG_PRODUCER;
import static org.apache.flink.table.store.CoreOptions.LOG_CHANGELOG_MODE;
import static org.apache.flink.table.store.CoreOptions.LOG_CONSISTENCY;
@@ -116,9 +118,15 @@ public class TableStoreSource
} else if (table instanceof ChangelogValueCountFileStoreTable) {
return ChangelogMode.all();
} else if (table instanceof ChangelogWithKeyFileStoreTable) {
+ Configuration options =
Configuration.fromMap(table.schema().options());
+
+ if (logStoreTableFactory == null
+ && options.get(CHANGELOG_PRODUCER) !=
ChangelogProducer.NONE) {
+ return ChangelogMode.all();
+ }
+
// optimization: transaction consistency and all changelog mode
avoid the generation of
// normalized nodes. See TableStoreSink.getChangelogMode
validation.
- Configuration options =
Configuration.fromMap(table.schema().options());
return options.get(LOG_CONSISTENCY) == LogConsistency.TRANSACTIONAL
&& options.get(LOG_CHANGELOG_MODE) ==
LogChangelogMode.ALL
? ChangelogMode.all()
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ChangelogModeTest.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ChangelogModeTest.java
new file mode 100644
index 00000000..8f461ef9
--- /dev/null
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ChangelogModeTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.connector.sink.TableStoreSink;
+import org.apache.flink.table.store.connector.source.TableStoreSource;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.UpdateSchema;
+import org.apache.flink.table.store.kafka.KafkaLogStoreFactory;
+import org.apache.flink.table.store.log.LogStoreTableFactory;
+import org.apache.flink.table.store.table.FileStoreTable;
+import org.apache.flink.table.store.table.FileStoreTableFactory;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for changelog mode with flink source and sink. */
+public class ChangelogModeTest {
+
+ @TempDir java.nio.file.Path temp;
+
+ private final ObjectIdentifier identifier = ObjectIdentifier.of("c", "d",
"t");
+
+ private Path path;
+
+ @BeforeEach
+ public void beforeEach() {
+ path = new Path(temp.toUri().toString());
+ }
+
+ private void test(Configuration options, ChangelogMode expectSource,
ChangelogMode expectSink)
+ throws Exception {
+ test(options, expectSource, expectSink, null);
+ }
+
+ private void test(
+ Configuration options,
+ ChangelogMode expectSource,
+ ChangelogMode expectSink,
+ @Nullable LogStoreTableFactory logStoreTableFactory)
+ throws Exception {
+ new SchemaManager(path)
+ .commitNewVersion(
+ new UpdateSchema(
+ RowType.of(new IntType(), new IntType()),
+ Collections.emptyList(),
+ Collections.singletonList("f0"),
+ options.toMap(),
+ ""));
+ FileStoreTable table = FileStoreTableFactory.create(path);
+
+ TableStoreSource source =
+ new TableStoreSource(identifier, table, true, null,
logStoreTableFactory);
+ assertThat(source.getChangelogMode()).isEqualTo(expectSource);
+
+ TableStoreSink sink = new TableStoreSink(identifier, table, null,
null);
+
assertThat(sink.getChangelogMode(ChangelogMode.all())).isEqualTo(expectSink);
+ }
+
+ @Test
+ public void testDefault() throws Exception {
+ test(new Configuration(), ChangelogMode.upsert(),
ChangelogMode.upsert());
+ }
+
+ @Test
+ public void testInputChangelogProducer() throws Exception {
+ Configuration options = new Configuration();
+ options.set(CoreOptions.CHANGELOG_PRODUCER,
CoreOptions.ChangelogProducer.INPUT);
+ test(options, ChangelogMode.all(), ChangelogMode.all());
+ }
+
+ @Test
+ public void testChangelogModeAll() throws Exception {
+ Configuration options = new Configuration();
+ options.set(CoreOptions.LOG_CHANGELOG_MODE,
CoreOptions.LogChangelogMode.ALL);
+ test(options, ChangelogMode.all(), ChangelogMode.all());
+ }
+
+ @Test
+ public void testInputChangelogProducerWithLog() throws Exception {
+ Configuration options = new Configuration();
+ options.set(CoreOptions.CHANGELOG_PRODUCER,
CoreOptions.ChangelogProducer.INPUT);
+ test(options, ChangelogMode.upsert(), ChangelogMode.all(), new
KafkaLogStoreFactory());
+ }
+}