This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 07d642933 [doc][flink] Improve some small content (#1740)
07d642933 is described below
commit 07d642933f155018e8416ad39421791ddcbf8e08
Author: monster <[email protected]>
AuthorDate: Mon Aug 7 17:56:08 2023 +0800
[doc][flink] Improve some small content (#1740)
---
docs/content/engines/hive.md | 2 +-
docs/content/how-to/altering-tables.md | 4 +--
docs/content/how-to/creating-tables.md | 4 +--
docs/content/how-to/querying-tables.md | 8 ++---
docs/content/how-to/writing-tables.md | 12 +++----
docs/content/maintenance/rescale-bucket.md | 4 +--
.../apache/paimon/flink/PredicateConverter.java | 10 ++++--
.../action/cdc/mysql/MySqlSyncTableAction.java | 3 +-
.../flink/kafka/KafkaLogDeserializationSchema.java | 3 +-
.../org/apache/paimon/flink/sink/FlinkSink.java | 2 +-
.../flink/sink/RowDataStoreWriteOperator.java | 3 +-
.../apache/paimon/flink/sink/StoreCommitter.java | 2 +-
...MultiTableUpdatedDataFieldsProcessFunction.java | 38 ++--------------------
.../sink/cdc/UpdatedDataFieldsProcessFunction.java | 5 +++
.../flink/source/FileStoreSourceSplitReader.java | 9 +++--
.../AlignedContinuousFileSplitEnumerator.java | 4 ++-
.../paimon/flink/source/align/CheckpointEvent.java | 5 +--
17 files changed, 52 insertions(+), 66 deletions(-)
diff --git a/docs/content/engines/hive.md b/docs/content/engines/hive.md
index 5c878be19..25e616dd2 100644
--- a/docs/content/engines/hive.md
+++ b/docs/content/engines/hive.md
@@ -202,7 +202,7 @@ CREATE TABLE hive_test_table(
a INT COMMENT 'The a field',
b STRING COMMENT 'The b field'
)
-STORED BY 'org.apache.paimon.hive.PaimonStorageHandler'
+STORED BY 'org.apache.paimon.hive.PaimonStorageHandler';
```
## Hive SQL: access Paimon Tables by External Table
diff --git a/docs/content/how-to/altering-tables.md
b/docs/content/how-to/altering-tables.md
index d7c9f62e3..61bb3a7d1 100644
--- a/docs/content/how-to/altering-tables.md
+++ b/docs/content/how-to/altering-tables.md
@@ -315,7 +315,7 @@ The following SQL changes comment of column `buy_count` to
`buy count`.
{{< tab "Flink" >}}
```sql
-ALTER TABLE my_table MODIFY buy_count BIGINT COMMENT 'buy count'
+ALTER TABLE my_table MODIFY buy_count BIGINT COMMENT 'buy count';
```
{{< /tab >}}
@@ -451,7 +451,7 @@ The following SQL drops the watermark of table `my_table`.
{{< tab "Flink" >}}
```sql
-ALTER TABLE my_table DROP WATERMARK
+ALTER TABLE my_table DROP WATERMARK;
```
{{< /tab >}}
diff --git a/docs/content/how-to/creating-tables.md
b/docs/content/how-to/creating-tables.md
index d1a18a015..9e1326ba8 100644
--- a/docs/content/how-to/creating-tables.md
+++ b/docs/content/how-to/creating-tables.md
@@ -362,7 +362,7 @@ CREATE TABLE MyTablePk (
dt STRING,
hh STRING,
PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
-) ;
+);
CREATE TABLE MyTablePkAs WITH ('primary-key' = 'dt,hh') AS SELECT * FROM
MyTablePk;
@@ -453,7 +453,7 @@ CREATE TABLE MyTable (
dt STRING,
hh STRING,
PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
-) ;
+);
CREATE TABLE MyTableLike LIKE MyTable;
```
diff --git a/docs/content/how-to/querying-tables.md
b/docs/content/how-to/querying-tables.md
index 8c148ce1d..5bc490e55 100644
--- a/docs/content/how-to/querying-tables.md
+++ b/docs/content/how-to/querying-tables.md
@@ -139,7 +139,7 @@ SELECT * FROM t;
SET paimon.scan.timestamp-millis=null;
-- read tag 'my-tag'
-set paimon.scan.tag-name=my-tag:
+set paimon.scan.tag-name=my-tag;
SELECT * FROM t;
set paimon.scan.tag-name=null;
```
@@ -235,7 +235,7 @@ You can also do streaming read without the snapshot data,
you can use `latest` s
{{< tab "Flink" >}}
```sql
-- Continuously reads latest changes without producing a snapshot at the
beginning.
-SELECT * FROM t /*+ OPTIONS('scan.mode' = 'latest') */
+SELECT * FROM t /*+ OPTIONS('scan.mode' = 'latest') */;
```
{{< /tab >}}
{{< /tabs >}}
@@ -245,7 +245,7 @@ SELECT * FROM t /*+ OPTIONS('scan.mode' = 'latest') */
If you only want to process data for today and beyond, you can do so with
partitioned filters:
```sql
-SELECT * FROM t WHERE dt > '2023-06-26'
+SELECT * FROM t WHERE dt > '2023-06-26';
```
If it's not a partitioned table, or you can't filter by partition, you can use
Time travel's stream read.
@@ -348,7 +348,7 @@ CREATE TABLE orders (
order_id BIGINT,
.....,
PRIMARY KEY (catalog_id, order_id) NOT ENFORCED -- composite primary key
-)
+);
```
The query obtains a good acceleration by specifying a range filter for
diff --git a/docs/content/how-to/writing-tables.md
b/docs/content/how-to/writing-tables.md
index e123f48ed..65897617c 100644
--- a/docs/content/how-to/writing-tables.md
+++ b/docs/content/how-to/writing-tables.md
@@ -33,7 +33,7 @@ be specified by value expressions or result from a query.
## Syntax
```sql
-INSERT { INTO | OVERWRITE } table_identifier [ part_spec ] [ column_list ] {
value_expr | query }
+INSERT { INTO | OVERWRITE } table_identifier [ part_spec ] [ column_list ] {
value_expr | query };
```
- part_spec
@@ -103,7 +103,7 @@ Other engines will throw respective exception to announce
this. We can use funct
turn a nullable column into a non-null column to escape exception:
```sql
-INSERT INTO A key1 SELECT COALESCE(key2, <non-null expression>) FROM B
+INSERT INTO A key1 SELECT COALESCE(key2, <non-null expression>) FROM B;
```
## Applying Records/Changes to Tables
@@ -195,7 +195,7 @@ You can use `INSERT OVERWRITE` to purge tables by inserting
empty value.
{{< tab "Flink" >}}
```sql
-INSERT OVERWRITE MyTable /*+ OPTIONS('dynamic-partition-overwrite'='false') */
SELECT * FROM MyTable WHERE false
+INSERT OVERWRITE MyTable /*+ OPTIONS('dynamic-partition-overwrite'='false') */
SELECT * FROM MyTable WHERE false;
```
{{< /tab >}}
@@ -217,7 +217,7 @@ Currently, Paimon supports two ways to purge partitions.
```sql
-- Syntax
INSERT OVERWRITE MyTable /*+ OPTIONS('dynamic-partition-overwrite'='false') */
-PARTITION (key1 = value1, key2 = value2, ...) SELECT selectSpec FROM MyTable
WHERE false
+PARTITION (key1 = value1, key2 = value2, ...) SELECT selectSpec FROM MyTable
WHERE false;
-- The following SQL is an example:
-- table definition
@@ -229,11 +229,11 @@ CREATE TABLE MyTable (
-- you can use
INSERT OVERWRITE MyTable /*+ OPTIONS('dynamic-partition-overwrite'='false') */
-PARTITION (k0 = 0) SELECT k1, v FROM MyTable WHERE false
+PARTITION (k0 = 0) SELECT k1, v FROM MyTable WHERE false;
-- or
INSERT OVERWRITE MyTable /*+ OPTIONS('dynamic-partition-overwrite'='false') */
-PARTITION (k0 = 0, k1 = 0) SELECT v FROM MyTable WHERE false
+PARTITION (k0 = 0, k1 = 0) SELECT v FROM MyTable WHERE false;
```
{{< /tab >}}
diff --git a/docs/content/maintenance/rescale-bucket.md
b/docs/content/maintenance/rescale-bucket.md
index 7076b18c2..a67a4aa78 100644
--- a/docs/content/maintenance/rescale-bucket.md
+++ b/docs/content/maintenance/rescale-bucket.md
@@ -34,13 +34,13 @@ scan the data with the old bucket number and hash the
record according to the cu
## Rescale Overwrite
```sql
-- rescale number of total buckets
-ALTER TABLE table_identifier SET ('bucket' = '...')
+ALTER TABLE table_identifier SET ('bucket' = '...');
-- reorganize data layout of table/partition
INSERT OVERWRITE table_identifier [PARTITION (part_spec)]
SELECT ...
FROM table_identifier
-[WHERE part_spec]
+[WHERE part_spec];
```
Please note that
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PredicateConverter.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PredicateConverter.java
index b34de03a8..27d498c2f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PredicateConverter.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PredicateConverter.java
@@ -40,6 +40,7 @@ import org.apache.flink.table.types.logical.RowType;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.regex.Matcher;
@@ -120,12 +121,17 @@ public class PredicateConverter implements
ExpressionVisitor<Predicate> {
.getFamilies()
.contains(LogicalTypeFamily.CHARACTER_STRING)) {
String sqlPattern =
- extractLiteral(fieldRefExpr.getOutputDataType(),
children.get(1))
+ Objects.requireNonNull(
+ extractLiteral(
+
fieldRefExpr.getOutputDataType(), children.get(1)))
.toString();
String escape =
children.size() <= 2
? null
- :
extractLiteral(fieldRefExpr.getOutputDataType(), children.get(2))
+ : Objects.requireNonNull(
+ extractLiteral(
+
fieldRefExpr.getOutputDataType(),
+ children.get(2)))
.toString();
String escapedSqlPattern = sqlPattern;
boolean allowQuick = false;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
index 433105863..b2a067b16 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
@@ -38,6 +38,7 @@ import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
@@ -181,7 +182,7 @@ public class MySqlSyncTableAction extends ActionBase {
.collect(Collectors.toList());
List<String> fieldNames = table.schema().fieldNames();
checkArgument(
- fieldNames.containsAll(computedFields),
+ new HashSet<>(fieldNames).containsAll(computedFields),
" Exists Table should contain all computed columns %s,
but are %s.",
computedFields,
fieldNames);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogDeserializationSchema.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogDeserializationSchema.java
index d425472e5..22ae1dd86 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogDeserializationSchema.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogDeserializationSchema.java
@@ -36,6 +36,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import javax.annotation.Nullable;
+import java.util.Objects;
import java.util.stream.IntStream;
/** A {@link KafkaDeserializationSchema} for the table with primary key in log
store. */
@@ -125,7 +126,7 @@ public class KafkaLogDeserializationSchema implements
KafkaDeserializationSchema
Collector<RowData> collector =
projectCollector.project(underCollector);
if (primaryKey.length > 0 && record.value() == null) {
- RowData key = primaryKeyDeserializer.deserialize(record.key());
+ RowData key =
Objects.requireNonNull(primaryKeyDeserializer).deserialize(record.key());
GenericRowData value = new GenericRowData(RowKind.DELETE,
fieldCount);
for (int i = 0; i < primaryKey.length; i++) {
value.setField(primaryKey[i],
keyFieldGetters[i].getFieldOrNull(key));
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index 1f586aefa..afe9fc917 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -146,7 +146,7 @@ public abstract class FlinkSink<T> implements Serializable {
.get(ExecutionOptions.RUNTIME_MODE)
== RuntimeExecutionMode.STREAMING;
- Boolean writeOnly = table.coreOptions().writeOnly();
+ boolean writeOnly = table.coreOptions().writeOnly();
SingleOutputStreamOperator<Committable> written =
input.transform(
(writeOnly ? WRITER_WRITE_ONLY_NAME :
WRITER_NAME)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
index 860cfc2a5..81fa48aa4 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
@@ -43,6 +43,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.List;
+import java.util.Objects;
/** A {@link PrepareCommitOperator} to write {@link RowData}. Record schema is
fixed. */
public class RowDataStoreWriteOperator extends TableWriteOperator<RowData> {
@@ -184,7 +185,7 @@ public class RowDataStoreWriteOperator extends
TableWriteOperator<RowData> {
if (logCallback != null) {
try {
- logSinkFunction.flush();
+ Objects.requireNonNull(logSinkFunction).flush();
} catch (Exception e) {
throw new IOException(e);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
index 35e9229a1..dac073ec7 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
@@ -66,7 +66,7 @@ public class StoreCommitter implements Committer<Committable,
ManifestCommittabl
}
@Override
- public int filterAndCommit(List<ManifestCommittable> globalCommittables)
throws IOException {
+ public int filterAndCommit(List<ManifestCommittable> globalCommittables) {
return commit.filterAndCommitMultiple(globalCommittables);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
index f601f7e2b..870148948 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
@@ -28,7 +28,6 @@ import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypeChecks;
import org.apache.paimon.types.DataTypeRoot;
-import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Preconditions;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -38,13 +37,14 @@ import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import static
org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction.getSchemaChanges;
+
/**
* A {@link ProcessFunction} to handle schema changes. New schema is
represented by a list of {@link
* DataField}s.
@@ -111,39 +111,7 @@ public class MultiTableUpdatedDataFieldsProcessFunction
private List<SchemaChange> extractSchemaChanges(
SchemaManager schemaManager, List<DataField> updatedDataFields) {
- RowType oldRowType = schemaManager.latest().get().logicalRowType();
- Map<String, DataField> oldFields = new HashMap<>();
- for (DataField oldField : oldRowType.getFields()) {
- oldFields.put(oldField.name(), oldField);
- }
-
- List<SchemaChange> result = new ArrayList<>();
- for (DataField newField : updatedDataFields) {
- if (oldFields.containsKey(newField.name())) {
- DataField oldField = oldFields.get(newField.name());
- // we compare by ignoring nullable, because partition keys and
primary keys might be
- // nullable in source database, but they can't be null in
Paimon
- if (oldField.type().equalsIgnoreNullable(newField.type())) {
- if (!Objects.equals(oldField.description(),
newField.description())) {
- result.add(
- SchemaChange.updateColumnComment(
- new String[] {newField.name()},
newField.description()));
- }
- } else {
- result.add(SchemaChange.updateColumnType(newField.name(),
newField.type()));
- if (newField.description() != null) {
- result.add(
- SchemaChange.updateColumnComment(
- new String[] {newField.name()},
newField.description()));
- }
- }
- } else {
- result.add(
- SchemaChange.addColumn(
- newField.name(), newField.type(),
newField.description(), null));
- }
- }
- return result;
+ return getSchemaChanges(updatedDataFields, schemaManager);
}
private void applySchemaChange(
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
index 08db4c70d..2b5312172 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
@@ -92,6 +92,11 @@ public class UpdatedDataFieldsProcessFunction extends
ProcessFunction<List<DataF
}
private List<SchemaChange> extractSchemaChanges(List<DataField>
updatedDataFields) {
+ return getSchemaChanges(updatedDataFields, schemaManager);
+ }
+
+ public static List<SchemaChange> getSchemaChanges(
+ List<DataField> updatedDataFields, SchemaManager schemaManager) {
RowType oldRowType = schemaManager.latest().get().logicalRowType();
Map<String, DataField> oldFields = new HashMap<>();
for (DataField oldField : oldRowType.getFields()) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java
index 29090d608..4eb803c0a 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java
@@ -41,6 +41,7 @@ import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
+import java.util.Objects;
import java.util.Queue;
import java.util.Set;
@@ -89,7 +90,10 @@ public class FileStoreSourceSplitReader
nextBatch = currentFirstBatch;
currentFirstBatch = null;
} else {
- nextBatch = reachLimit() ? null :
currentReader.recordReader().readBatch();
+ nextBatch =
+ reachLimit()
+ ? null
+ :
Objects.requireNonNull(currentReader).recordReader().readBatch();
}
if (nextBatch == null) {
pool.recycler().recycle(iterator);
@@ -179,7 +183,8 @@ public class FileStoreSourceSplitReader
private void seek(long toSkip) throws IOException {
while (true) {
- RecordIterator<InternalRow> nextBatch =
currentReader.recordReader().readBatch();
+ RecordIterator<InternalRow> nextBatch =
+
Objects.requireNonNull(currentReader).recordReader().readBatch();
if (nextBatch == null) {
throw new RuntimeException(
String.format(
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java
index af9f71f54..cdb313be3 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java
@@ -45,6 +45,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.TreeMap;
import java.util.concurrent.ArrayBlockingQueue;
@@ -155,7 +156,7 @@ public class AlignedContinuousFileSplitEnumerator extends
ContinuousFileSplitEnu
}
}
PlanWithNextSnapshotId pendingPlan = pendingPlans.poll();
- addSplits(splitGenerator.createSplits(pendingPlan.plan()));
+
addSplits(splitGenerator.createSplits(Objects.requireNonNull(pendingPlan).plan()));
nextSnapshotId = pendingPlan.nextSnapshotId();
assignSplits();
}
@@ -219,6 +220,7 @@ public class AlignedContinuousFileSplitEnumerator extends
ContinuousFileSplitEnu
PlanWithNextSnapshotId nextPlan = pendingPlans.poll();
if (nextPlan != null) {
nextSnapshotId = nextPlan.nextSnapshotId();
+ Objects.requireNonNull(nextSnapshotId);
TableScan.Plan plan = nextPlan.plan();
if (plan.splits().isEmpty()) {
addSplits(
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/CheckpointEvent.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/CheckpointEvent.java
index eddce4f08..845fb8278 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/CheckpointEvent.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/CheckpointEvent.java
@@ -20,8 +20,6 @@ package org.apache.paimon.flink.source.align;
import org.apache.flink.api.connector.source.SourceEvent;
-import javax.annotation.Nonnull;
-
import java.util.Objects;
/**
@@ -32,13 +30,12 @@ public class CheckpointEvent implements SourceEvent {
private static final long serialVersionUID = 1L;
- @Nonnull private final long checkpointId;
+ private final long checkpointId;
public CheckpointEvent(long checkpointId) {
this.checkpointId = checkpointId;
}
- @Nonnull
public long getCheckpointId() {
return checkpointId;
}