This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 41338cc [FLINK-26457] Introduce PartialUpdateMergeFunction
41338cc is described below
commit 41338cc0d13810becdd2390ab809e5cee9b59712
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Apr 22 09:55:15 2022 +0800
[FLINK-26457] Introduce PartialUpdateMergeFunction
This closes #98
---
docs/content/docs/development/create-table.md | 35 ++++++++
.../flink/table/store/connector/TableStore.java | 32 +++++++-
.../store/connector/ContinuousFileStoreITCase.java | 38 ++-------
.../store/connector/FileStoreTableITCase.java | 61 ++++++++++++++
.../table/store/connector/PartialUpdateITCase.java | 94 ++++++++++++++++++++++
.../flink/table/store/file/FileStoreOptions.java | 43 ++++++++++
.../compact/DeduplicateMergeFunction.java | 2 +
...nction.java => PartialUpdateMergeFunction.java} | 31 +++++--
.../mergetree/compact/ValueCountMergeFunction.java | 2 +
9 files changed, 298 insertions(+), 40 deletions(-)
diff --git a/docs/content/docs/development/create-table.md
b/docs/content/docs/development/create-table.md
index 8b94f4f..95f5e87 100644
--- a/docs/content/docs/development/create-table.md
+++ b/docs/content/docs/development/create-table.md
@@ -233,3 +233,38 @@ The two methods do not behave in the same way when
querying.
Use approach one if you have a large number of filtered queries
with only `user_id`, and use approach two if you have a large
number of filtered queries with only `catalog_id`.
+
+## Partial Update
+
+You can configure partial update from options:
+
+```sql
+CREATE TABLE MyTable (
+ product_id BIGINT,
+ price DOUBLE,
+ number BIGINT,
+ detail STRING,
+ PRIMARY KEY (product_id) NOT ENFORCED
+) WITH (
+ 'merge-engine' = 'partial-update'
+);
+```
+
+{{< hint info >}}
+__Note:__ Partial update is only supported for table with primary key.
+{{< /hint >}}
+
+{{< hint info >}}
+__Note:__ Partial update is not supported for streaming consuming.
+{{< /hint >}}
+
+The value fields are updated to the latest data one by one
+under the same primary key, but null values are not overwritten.
+
+For example, the inputs:
+- <1, 23.0, 10, NULL>
+- <1, NULL, 20, 'This is a book'>
+- <1, 25.2, NULL, NULL>
+
+Output:
+- <1, 25.2, 20, 'This is a book'>
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
index 37315c4..c17a309 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/TableStore.java
@@ -31,6 +31,7 @@ import
org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
+import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.CatalogLock;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.Projection;
@@ -44,8 +45,10 @@ import
org.apache.flink.table.store.connector.source.LogHybridSourceFactory;
import
org.apache.flink.table.store.connector.source.StaticFileStoreSplitEnumerator;
import org.apache.flink.table.store.file.FileStore;
import org.apache.flink.table.store.file.FileStoreImpl;
+import org.apache.flink.table.store.file.FileStoreOptions.MergeEngine;
import
org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
+import
org.apache.flink.table.store.file.mergetree.compact.PartialUpdateMergeFunction;
import
org.apache.flink.table.store.file.mergetree.compact.ValueCountMergeFunction;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.log.LogOptions.LogStartupMode;
@@ -68,6 +71,8 @@ import java.util.stream.Collectors;
import static org.apache.flink.table.store.file.FileStoreOptions.BUCKET;
import static
org.apache.flink.table.store.file.FileStoreOptions.CONTINUOUS_DISCOVERY_INTERVAL;
+import static org.apache.flink.table.store.file.FileStoreOptions.MERGE_ENGINE;
+import static
org.apache.flink.table.store.file.FileStoreOptions.MergeEngine.PARTIAL_UPDATE;
import static org.apache.flink.table.store.log.LogOptions.LOG_PREFIX;
import static org.apache.flink.table.store.log.LogOptions.SCAN;
@@ -164,6 +169,10 @@ public class TableStore {
return new SinkBuilder();
}
+ private MergeEngine mergeEngine() {
+ return options.get(MERGE_ENGINE);
+ }
+
private FileStore buildFileStore() {
RowType partitionType = TypeUtils.project(type, partitions);
RowType keyType;
@@ -190,7 +199,23 @@ public class TableStore {
f.getDescription().orElse(null)))
.collect(Collectors.toList()));
valueType = type;
- mergeFunction = new DeduplicateMergeFunction();
+
+ switch (mergeEngine()) {
+ case DEDUPLICATE:
+ mergeFunction = new DeduplicateMergeFunction();
+ break;
+ case PARTIAL_UPDATE:
+ List<LogicalType> fieldTypes = type.getChildren();
+ RowData.FieldGetter[] fieldGetters = new
RowData.FieldGetter[fieldTypes.size()];
+ for (int i = 0; i < fieldTypes.size(); i++) {
+ fieldGetters[i] =
RowData.createFieldGetter(fieldTypes.get(i), i);
+ }
+ mergeFunction = new
PartialUpdateMergeFunction(fieldGetters);
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported merge engine: " + mergeEngine());
+ }
}
return new FileStoreImpl(
tableIdentifier, options, user, partitionType, keyType,
valueType, mergeFunction);
@@ -300,6 +325,11 @@ public class TableStore {
private Source<RowData, ?, ?> buildSource() {
if (isContinuous) {
+ if (primaryKeys.length > 0 && mergeEngine() == PARTIAL_UPDATE)
{
+ throw new ValidationException(
+ "Partial update continuous reading is not
supported.");
+ }
+
LogStartupMode startupMode = logOptions().get(SCAN);
if (logSourceProvider == null) {
return buildFileSource(true, startupMode ==
LogStartupMode.LATEST);
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ContinuousFileStoreITCase.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ContinuousFileStoreITCase.java
index f873e8e..0be5e18 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ContinuousFileStoreITCase.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/ContinuousFileStoreITCase.java
@@ -18,50 +18,26 @@
package org.apache.flink.table.store.connector;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.store.file.utils.BlockingIterator;
-import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.types.Row;
-import org.junit.Before;
import org.junit.Test;
-import java.io.IOException;
-import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
-import static
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
-import static org.apache.flink.table.store.file.FileStoreOptions.PATH;
-import static
org.apache.flink.table.store.file.FileStoreOptions.TABLE_STORE_PREFIX;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** SQL ITCase for continuous file store. */
-public class ContinuousFileStoreITCase extends AbstractTestBase {
-
- private TableEnvironment bEnv;
- private TableEnvironment sEnv;
-
- @Before
- public void before() throws IOException {
- bEnv =
TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
- sEnv =
TableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build());
- sEnv.getConfig().getConfiguration().set(CHECKPOINTING_INTERVAL,
Duration.ofMillis(100));
- String path = TEMPORARY_FOLDER.newFolder().toURI().toString();
- prepareEnv(bEnv, path);
- prepareEnv(sEnv, path);
- }
+public class ContinuousFileStoreITCase extends FileStoreTableITCase {
- private void prepareEnv(TableEnvironment env, String path) {
- Configuration config = env.getConfig().getConfiguration();
-
config.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
- config.setString(TABLE_STORE_PREFIX + PATH.key(), path);
- env.executeSql("CREATE TABLE IF NOT EXISTS T1 (a STRING, b STRING, c
STRING)");
- env.executeSql(
+ @Override
+ protected List<String> ddl() {
+ return Arrays.asList(
+ "CREATE TABLE IF NOT EXISTS T1 (a STRING, b STRING, c STRING)",
"CREATE TABLE IF NOT EXISTS T2 (a STRING, b STRING, c STRING,
PRIMARY KEY (a) NOT ENFORCED)");
}
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java
new file mode 100644
index 0000000..2fd5092
--- /dev/null
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileStoreTableITCase.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.test.util.AbstractTestBase;
+
+import org.junit.Before;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.List;
+
+import static
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
+import static org.apache.flink.table.store.file.FileStoreOptions.PATH;
+import static
org.apache.flink.table.store.file.FileStoreOptions.TABLE_STORE_PREFIX;
+
+/** ITCase for file store table api. */
+public abstract class FileStoreTableITCase extends AbstractTestBase {
+
+ protected TableEnvironment bEnv;
+ protected TableEnvironment sEnv;
+
+ @Before
+ public void before() throws IOException {
+ bEnv =
TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
+ sEnv =
TableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build());
+ sEnv.getConfig().getConfiguration().set(CHECKPOINTING_INTERVAL,
Duration.ofMillis(100));
+ String path = TEMPORARY_FOLDER.newFolder().toURI().toString();
+ prepareEnv(bEnv, path);
+ prepareEnv(sEnv, path);
+ }
+
+ private void prepareEnv(TableEnvironment env, String path) {
+ Configuration config = env.getConfig().getConfiguration();
+
config.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 2);
+ config.setString(TABLE_STORE_PREFIX + PATH.key(), path);
+ ddl().forEach(env::executeSql);
+ }
+
+ protected abstract List<String> ddl();
+}
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PartialUpdateITCase.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PartialUpdateITCase.java
new file mode 100644
index 0000000..7d4094a
--- /dev/null
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PartialUpdateITCase.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.flink.util.CollectionUtil.iteratorToList;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** ITCase for partial update. */
+public class PartialUpdateITCase extends FileStoreTableITCase {
+
+ @Override
+ protected List<String> ddl() {
+ return Collections.singletonList(
+ "CREATE TABLE IF NOT EXISTS T ("
+ + "j INT, k INT, a INT, b INT, c STRING, PRIMARY KEY
(j,k) NOT ENFORCED)"
+ + " WITH ('merge-engine'='partial-update');");
+ }
+
+ @Test
+ public void testMergeInMemory() throws ExecutionException,
InterruptedException {
+ bEnv.executeSql(
+ "INSERT INTO T VALUES "
+ + "(1, 2, 3, CAST(NULL AS INT), '5'), "
+ + "(1, 2, CAST(NULL AS INT), 6, CAST(NULL AS
STRING))")
+ .await();
+ List<Row> result = iteratorToList(bEnv.from("T").execute().collect());
+ assertThat(result).containsExactlyInAnyOrder(Row.of(1, 2, 3, 6, "5"));
+ }
+
+ @Test
+ public void testMergeRead() throws ExecutionException,
InterruptedException {
+ bEnv.executeSql("INSERT INTO T VALUES (1, 2, 3, CAST(NULL AS INT),
CAST(NULL AS STRING))")
+ .await();
+ bEnv.executeSql("INSERT INTO T VALUES (1, 2, 4, 5, CAST(NULL AS
STRING))").await();
+ bEnv.executeSql("INSERT INTO T VALUES (1, 2, 4, CAST(NULL AS INT),
'6')").await();
+
+ List<Row> result = iteratorToList(bEnv.from("T").execute().collect());
+ assertThat(result).containsExactlyInAnyOrder(Row.of(1, 2, 4, 5, "6"));
+ }
+
+ @Test
+ public void testMergeCompaction() throws ExecutionException,
InterruptedException {
+ // Wait compaction
+ bEnv.executeSql("ALTER TABLE T SET ('commit.force-compact'='true')");
+
+ // key 1 2
+ bEnv.executeSql("INSERT INTO T VALUES (1, 2, 3, CAST(NULL AS INT),
CAST(NULL AS STRING))")
+ .await();
+ bEnv.executeSql("INSERT INTO T VALUES (1, 2, 4, 5, CAST(NULL AS
STRING))").await();
+ bEnv.executeSql("INSERT INTO T VALUES (1, 2, 4, CAST(NULL AS INT),
'6')").await();
+
+ // key 1 3
+ bEnv.executeSql("INSERT INTO T VALUES (1, 3, CAST(NULL AS INT), 1,
'1')").await();
+ bEnv.executeSql("INSERT INTO T VALUES (1, 3, 2, 3, CAST(NULL AS
STRING))").await();
+ bEnv.executeSql("INSERT INTO T VALUES (1, 3, CAST(NULL AS INT), 4,
CAST(NULL AS STRING))")
+ .await();
+
+ List<Row> result = iteratorToList(bEnv.from("T").execute().collect());
+ assertThat(result)
+ .containsExactlyInAnyOrder(Row.of(1, 2, 4, 5, "6"), Row.of(1,
3, 2, 4, "1"));
+ }
+
+ @Test
+ public void testStreamingRead() {
+ assertThatThrownBy(
+ () -> sEnv.from("T").execute().print(),
+ "Partial update continuous reading is not supported");
+ }
+}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
index 0b91c9b..eb8c908 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/FileStoreOptions.java
@@ -21,7 +21,10 @@ package org.apache.flink.table.store.file;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DescribedEnum;
import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.description.Description;
+import org.apache.flink.configuration.description.InlineElement;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.store.file.format.FileFormat;
@@ -35,6 +38,8 @@ import java.util.Map;
import java.util.Set;
import static org.apache.flink.configuration.ConfigOptions.key;
+import static org.apache.flink.configuration.description.TextElement.text;
+import static org.apache.flink.table.store.utils.OptionsUtils.formatEnumOption;
/** Options for {@link FileStore}. */
public class FileStoreOptions implements Serializable {
@@ -111,6 +116,19 @@ public class FileStoreOptions implements Serializable {
.defaultValue(Duration.ofSeconds(1))
.withDescription("The discovery interval of continuous
reading.");
+ public static final ConfigOption<MergeEngine> MERGE_ENGINE =
+ ConfigOptions.key("merge-engine")
+ .enumType(MergeEngine.class)
+ .defaultValue(MergeEngine.DEDUPLICATE)
+ .withDescription(
+ Description.builder()
+ .text("Specifies the merge engine for
table with primary key.")
+ .linebreak()
+ .list(
+
formatEnumOption(MergeEngine.DEDUPLICATE),
+
formatEnumOption(MergeEngine.PARTIAL_UPDATE))
+ .build());
+
private final Configuration options;
public static Set<ConfigOption<?>> allOptions() {
@@ -207,4 +225,29 @@ public class FileStoreOptions implements Serializable {
public int manifestMergeMinCount() {
return options.get(MANIFEST_MERGE_MIN_COUNT);
}
+
+ /** Specifies the merge engine for table with primary key. */
+ public enum MergeEngine implements DescribedEnum {
+ DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
+
+ PARTIAL_UPDATE("partial-update", "Partial update non-null fields.");
+
+ private final String value;
+ private final String description;
+
+ MergeEngine(String value, String description) {
+ this.value = value;
+ this.description = description;
+ }
+
+ @Override
+ public String toString() {
+ return value;
+ }
+
+ @Override
+ public InlineElement getDescription() {
+ return text(description);
+ }
+ }
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/DeduplicateMergeFunction.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/DeduplicateMergeFunction.java
index b9befdf..fb9af1e 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/DeduplicateMergeFunction.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/DeduplicateMergeFunction.java
@@ -28,6 +28,8 @@ import javax.annotation.Nullable;
*/
public class DeduplicateMergeFunction implements MergeFunction {
+ private static final long serialVersionUID = 1L;
+
private RowData latestValue;
@Override
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/DeduplicateMergeFunction.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/PartialUpdateMergeFunction.java
similarity index 60%
copy from
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/DeduplicateMergeFunction.java
copy to
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/PartialUpdateMergeFunction.java
index b9befdf..7ba7717 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/DeduplicateMergeFunction.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/PartialUpdateMergeFunction.java
@@ -18,36 +18,51 @@
package org.apache.flink.table.store.file.mergetree.compact;
+import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import javax.annotation.Nullable;
/**
- * A {@link MergeFunction} where key is primary key (unique) and value is the
full record, only keep
- * the latest one.
+ * A {@link MergeFunction} where key is primary key (unique) and value is the
partial record, update
+ * non-null fields on merge.
*/
-public class DeduplicateMergeFunction implements MergeFunction {
+public class PartialUpdateMergeFunction implements MergeFunction {
- private RowData latestValue;
+ private static final long serialVersionUID = 1L;
+
+ private final RowData.FieldGetter[] getters;
+
+ private transient GenericRowData row;
+
+ public PartialUpdateMergeFunction(RowData.FieldGetter[] getters) {
+ this.getters = getters;
+ }
@Override
public void reset() {
- latestValue = null;
+ this.row = new GenericRowData(getters.length);
}
@Override
public void add(RowData value) {
- latestValue = value;
+ for (int i = 0; i < getters.length; i++) {
+ Object field = getters[i].getFieldOrNull(value);
+ if (field != null) {
+ row.setField(i, field);
+ }
+ }
}
@Override
@Nullable
public RowData getValue() {
- return latestValue;
+ return row;
}
@Override
public MergeFunction copy() {
- return new DeduplicateMergeFunction();
+ // RowData.FieldGetter is thread safe
+ return new PartialUpdateMergeFunction(getters);
}
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ValueCountMergeFunction.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ValueCountMergeFunction.java
index b62157a..aceddd5 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ValueCountMergeFunction.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/ValueCountMergeFunction.java
@@ -31,6 +31,8 @@ import static
org.apache.flink.util.Preconditions.checkArgument;
*/
public class ValueCountMergeFunction implements MergeFunction {
+ private static final long serialVersionUID = 1L;
+
private long total;
@Override