This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 07d642933 [doc][flink] Improve some small content (#1740)
07d642933 is described below

commit 07d642933f155018e8416ad39421791ddcbf8e08
Author: monster <[email protected]>
AuthorDate: Mon Aug 7 17:56:08 2023 +0800

    [doc][flink] Improve some small content (#1740)
---
 docs/content/engines/hive.md                       |  2 +-
 docs/content/how-to/altering-tables.md             |  4 +--
 docs/content/how-to/creating-tables.md             |  4 +--
 docs/content/how-to/querying-tables.md             |  8 ++---
 docs/content/how-to/writing-tables.md              | 12 +++----
 docs/content/maintenance/rescale-bucket.md         |  4 +--
 .../apache/paimon/flink/PredicateConverter.java    | 10 ++++--
 .../action/cdc/mysql/MySqlSyncTableAction.java     |  3 +-
 .../flink/kafka/KafkaLogDeserializationSchema.java |  3 +-
 .../org/apache/paimon/flink/sink/FlinkSink.java    |  2 +-
 .../flink/sink/RowDataStoreWriteOperator.java      |  3 +-
 .../apache/paimon/flink/sink/StoreCommitter.java   |  2 +-
 ...MultiTableUpdatedDataFieldsProcessFunction.java | 38 ++--------------------
 .../sink/cdc/UpdatedDataFieldsProcessFunction.java |  5 +++
 .../flink/source/FileStoreSourceSplitReader.java   |  9 +++--
 .../AlignedContinuousFileSplitEnumerator.java      |  4 ++-
 .../paimon/flink/source/align/CheckpointEvent.java |  5 +--
 17 files changed, 52 insertions(+), 66 deletions(-)

diff --git a/docs/content/engines/hive.md b/docs/content/engines/hive.md
index 5c878be19..25e616dd2 100644
--- a/docs/content/engines/hive.md
+++ b/docs/content/engines/hive.md
@@ -202,7 +202,7 @@ CREATE TABLE hive_test_table(
     a INT COMMENT 'The a field',
     b STRING COMMENT 'The b field'
 )
-STORED BY 'org.apache.paimon.hive.PaimonStorageHandler'
+STORED BY 'org.apache.paimon.hive.PaimonStorageHandler';
 ```
 
 ## Hive SQL: access Paimon Tables by External Table
diff --git a/docs/content/how-to/altering-tables.md 
b/docs/content/how-to/altering-tables.md
index d7c9f62e3..61bb3a7d1 100644
--- a/docs/content/how-to/altering-tables.md
+++ b/docs/content/how-to/altering-tables.md
@@ -315,7 +315,7 @@ The following SQL changes comment of column `buy_count` to 
`buy count`.
 {{< tab "Flink" >}}
 
 ```sql
-ALTER TABLE my_table MODIFY buy_count BIGINT COMMENT 'buy count'
+ALTER TABLE my_table MODIFY buy_count BIGINT COMMENT 'buy count';
 ```
 
 {{< /tab >}}
@@ -451,7 +451,7 @@ The following SQL drops the watermark of table `my_table`.
 {{< tab "Flink" >}}
 
 ```sql
-ALTER TABLE my_table DROP WATERMARK
+ALTER TABLE my_table DROP WATERMARK;
 ```
 
 {{< /tab >}}
diff --git a/docs/content/how-to/creating-tables.md 
b/docs/content/how-to/creating-tables.md
index d1a18a015..9e1326ba8 100644
--- a/docs/content/how-to/creating-tables.md
+++ b/docs/content/how-to/creating-tables.md
@@ -362,7 +362,7 @@ CREATE TABLE MyTablePk (
       dt STRING,
       hh STRING,
       PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
-) ;
+);
 CREATE TABLE MyTablePkAs WITH ('primary-key' = 'dt,hh') AS SELECT * FROM 
MyTablePk;
 
 
@@ -453,7 +453,7 @@ CREATE TABLE MyTable (
     dt STRING,
     hh STRING,
     PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
-) ;
+);
 
 CREATE TABLE MyTableLike LIKE MyTable;
 ```
diff --git a/docs/content/how-to/querying-tables.md 
b/docs/content/how-to/querying-tables.md
index 8c148ce1d..5bc490e55 100644
--- a/docs/content/how-to/querying-tables.md
+++ b/docs/content/how-to/querying-tables.md
@@ -139,7 +139,7 @@ SELECT * FROM t;
 SET paimon.scan.timestamp-millis=null;
     
 -- read tag 'my-tag'
-set paimon.scan.tag-name=my-tag:
+set paimon.scan.tag-name=my-tag;
 SELECT * FROM t;
 set paimon.scan.tag-name=null;
 ```
@@ -235,7 +235,7 @@ You can also do streaming read without the snapshot data, 
you can use `latest` s
 {{< tab "Flink" >}}
 ```sql
 -- Continuously reads latest changes without producing a snapshot at the 
beginning.
-SELECT * FROM t /*+ OPTIONS('scan.mode' = 'latest') */
+SELECT * FROM t /*+ OPTIONS('scan.mode' = 'latest') */;
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -245,7 +245,7 @@ SELECT * FROM t /*+ OPTIONS('scan.mode' = 'latest') */
 If you only want to process data for today and beyond, you can do so with 
partitioned filters:
 
 ```sql
-SELECT * FROM t WHERE dt > '2023-06-26'
+SELECT * FROM t WHERE dt > '2023-06-26';
 ```
 
 If it's not a partitioned table, or you can't filter by partition, you can use 
Time travel's stream read.
@@ -348,7 +348,7 @@ CREATE TABLE orders (
     order_id BIGINT,
     .....,
     PRIMARY KEY (catalog_id, order_id) NOT ENFORCED -- composite primary key
-)
+);
 ```
 
 The query obtains a good acceleration by specifying a range filter for
diff --git a/docs/content/how-to/writing-tables.md 
b/docs/content/how-to/writing-tables.md
index e123f48ed..65897617c 100644
--- a/docs/content/how-to/writing-tables.md
+++ b/docs/content/how-to/writing-tables.md
@@ -33,7 +33,7 @@ be specified by value expressions or result from a query.
 ## Syntax
 
 ```sql
-INSERT { INTO | OVERWRITE } table_identifier [ part_spec ] [ column_list ] { 
value_expr | query }
+INSERT { INTO | OVERWRITE } table_identifier [ part_spec ] [ column_list ] { 
value_expr | query };
 ```
 - part_spec
 
@@ -103,7 +103,7 @@ Other engines will throw respective exception to announce 
this. We can use funct
 turn a nullable column into a non-null column to escape exception:
 
 ```sql
-INSERT INTO A key1 SELECT COALESCE(key2, <non-null expression>) FROM B
+INSERT INTO A key1 SELECT COALESCE(key2, <non-null expression>) FROM B;
 ```
 
 ## Applying Records/Changes to Tables
@@ -195,7 +195,7 @@ You can use `INSERT OVERWRITE` to purge tables by inserting 
empty value.
 {{< tab "Flink" >}}
 
 ```sql
-INSERT OVERWRITE MyTable /*+ OPTIONS('dynamic-partition-overwrite'='false') */ 
SELECT * FROM MyTable WHERE false
+INSERT OVERWRITE MyTable /*+ OPTIONS('dynamic-partition-overwrite'='false') */ 
SELECT * FROM MyTable WHERE false;
 ```
 
 {{< /tab >}}
@@ -217,7 +217,7 @@ Currently, Paimon supports two ways to purge partitions.
 ```sql
 -- Syntax
 INSERT OVERWRITE MyTable /*+ OPTIONS('dynamic-partition-overwrite'='false') */ 
-PARTITION (key1 = value1, key2 = value2, ...) SELECT selectSpec FROM MyTable 
WHERE false
+PARTITION (key1 = value1, key2 = value2, ...) SELECT selectSpec FROM MyTable 
WHERE false;
 
 -- The following SQL is an example:
 -- table definition
@@ -229,11 +229,11 @@ CREATE TABLE MyTable (
 
 -- you can use
 INSERT OVERWRITE MyTable /*+ OPTIONS('dynamic-partition-overwrite'='false') */ 
-PARTITION (k0 = 0) SELECT k1, v FROM MyTable WHERE false
+PARTITION (k0 = 0) SELECT k1, v FROM MyTable WHERE false;
 
 -- or
 INSERT OVERWRITE MyTable /*+ OPTIONS('dynamic-partition-overwrite'='false') */ 
-PARTITION (k0 = 0, k1 = 0) SELECT v FROM MyTable WHERE false
+PARTITION (k0 = 0, k1 = 0) SELECT v FROM MyTable WHERE false;
 ```
 
 {{< /tab >}}
diff --git a/docs/content/maintenance/rescale-bucket.md 
b/docs/content/maintenance/rescale-bucket.md
index 7076b18c2..a67a4aa78 100644
--- a/docs/content/maintenance/rescale-bucket.md
+++ b/docs/content/maintenance/rescale-bucket.md
@@ -34,13 +34,13 @@ scan the data with the old bucket number and hash the 
record according to the cu
 ## Rescale Overwrite
 ```sql
 -- rescale number of total buckets
-ALTER TABLE table_identifier SET ('bucket' = '...')
+ALTER TABLE table_identifier SET ('bucket' = '...');
 
 -- reorganize data layout of table/partition
 INSERT OVERWRITE table_identifier [PARTITION (part_spec)]
 SELECT ... 
 FROM table_identifier
-[WHERE part_spec]
+[WHERE part_spec];
 ``` 
 
 Please note that
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PredicateConverter.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PredicateConverter.java
index b34de03a8..27d498c2f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PredicateConverter.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/PredicateConverter.java
@@ -40,6 +40,7 @@ import org.apache.flink.table.types.logical.RowType;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.function.BiFunction;
 import java.util.regex.Matcher;
@@ -120,12 +121,17 @@ public class PredicateConverter implements 
ExpressionVisitor<Predicate> {
                     .getFamilies()
                     .contains(LogicalTypeFamily.CHARACTER_STRING)) {
                 String sqlPattern =
-                        extractLiteral(fieldRefExpr.getOutputDataType(), 
children.get(1))
+                        Objects.requireNonNull(
+                                        extractLiteral(
+                                                
fieldRefExpr.getOutputDataType(), children.get(1)))
                                 .toString();
                 String escape =
                         children.size() <= 2
                                 ? null
-                                : 
extractLiteral(fieldRefExpr.getOutputDataType(), children.get(2))
+                                : Objects.requireNonNull(
+                                                extractLiteral(
+                                                        
fieldRefExpr.getOutputDataType(),
+                                                        children.get(2)))
                                         .toString();
                 String escapedSqlPattern = sqlPattern;
                 boolean allowQuick = false;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
index 433105863..b2a067b16 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
@@ -38,6 +38,7 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Predicate;
@@ -181,7 +182,7 @@ public class MySqlSyncTableAction extends ActionBase {
                                 .collect(Collectors.toList());
                 List<String> fieldNames = table.schema().fieldNames();
                 checkArgument(
-                        fieldNames.containsAll(computedFields),
+                        new HashSet<>(fieldNames).containsAll(computedFields),
                         " Exists Table should contain all computed columns %s, 
but are %s.",
                         computedFields,
                         fieldNames);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogDeserializationSchema.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogDeserializationSchema.java
index d425472e5..22ae1dd86 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogDeserializationSchema.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogDeserializationSchema.java
@@ -36,6 +36,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 
 import javax.annotation.Nullable;
 
+import java.util.Objects;
 import java.util.stream.IntStream;
 
 /** A {@link KafkaDeserializationSchema} for the table with primary key in log 
store. */
@@ -125,7 +126,7 @@ public class KafkaLogDeserializationSchema implements 
KafkaDeserializationSchema
         Collector<RowData> collector = 
projectCollector.project(underCollector);
 
         if (primaryKey.length > 0 && record.value() == null) {
-            RowData key = primaryKeyDeserializer.deserialize(record.key());
+            RowData key = 
Objects.requireNonNull(primaryKeyDeserializer).deserialize(record.key());
             GenericRowData value = new GenericRowData(RowKind.DELETE, 
fieldCount);
             for (int i = 0; i < primaryKey.length; i++) {
                 value.setField(primaryKey[i], 
keyFieldGetters[i].getFieldOrNull(key));
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index 1f586aefa..afe9fc917 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -146,7 +146,7 @@ public abstract class FlinkSink<T> implements Serializable {
                                 .get(ExecutionOptions.RUNTIME_MODE)
                         == RuntimeExecutionMode.STREAMING;
 
-        Boolean writeOnly = table.coreOptions().writeOnly();
+        boolean writeOnly = table.coreOptions().writeOnly();
         SingleOutputStreamOperator<Committable> written =
                 input.transform(
                                 (writeOnly ? WRITER_WRITE_ONLY_NAME : 
WRITER_NAME)
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
index 860cfc2a5..81fa48aa4 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
@@ -43,6 +43,7 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Objects;
 
 /** A {@link PrepareCommitOperator} to write {@link RowData}. Record schema is 
fixed. */
 public class RowDataStoreWriteOperator extends TableWriteOperator<RowData> {
@@ -184,7 +185,7 @@ public class RowDataStoreWriteOperator extends 
TableWriteOperator<RowData> {
 
         if (logCallback != null) {
             try {
-                logSinkFunction.flush();
+                Objects.requireNonNull(logSinkFunction).flush();
             } catch (Exception e) {
                 throw new IOException(e);
             }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
index 35e9229a1..dac073ec7 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
@@ -66,7 +66,7 @@ public class StoreCommitter implements Committer<Committable, 
ManifestCommittabl
     }
 
     @Override
-    public int filterAndCommit(List<ManifestCommittable> globalCommittables) 
throws IOException {
+    public int filterAndCommit(List<ManifestCommittable> globalCommittables) {
         return commit.filterAndCommitMultiple(globalCommittables);
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
index f601f7e2b..870148948 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java
@@ -28,7 +28,6 @@ import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypeChecks;
 import org.apache.paimon.types.DataTypeRoot;
-import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.Preconditions;
 
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -38,13 +37,14 @@ import org.apache.flink.util.Collector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
+import static 
org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction.getSchemaChanges;
+
 /**
  * A {@link ProcessFunction} to handle schema changes. New schema is 
represented by a list of {@link
  * DataField}s.
@@ -111,39 +111,7 @@ public class MultiTableUpdatedDataFieldsProcessFunction
 
     private List<SchemaChange> extractSchemaChanges(
             SchemaManager schemaManager, List<DataField> updatedDataFields) {
-        RowType oldRowType = schemaManager.latest().get().logicalRowType();
-        Map<String, DataField> oldFields = new HashMap<>();
-        for (DataField oldField : oldRowType.getFields()) {
-            oldFields.put(oldField.name(), oldField);
-        }
-
-        List<SchemaChange> result = new ArrayList<>();
-        for (DataField newField : updatedDataFields) {
-            if (oldFields.containsKey(newField.name())) {
-                DataField oldField = oldFields.get(newField.name());
-                // we compare by ignoring nullable, because partition keys and 
primary keys might be
-                // nullable in source database, but they can't be null in 
Paimon
-                if (oldField.type().equalsIgnoreNullable(newField.type())) {
-                    if (!Objects.equals(oldField.description(), 
newField.description())) {
-                        result.add(
-                                SchemaChange.updateColumnComment(
-                                        new String[] {newField.name()}, 
newField.description()));
-                    }
-                } else {
-                    result.add(SchemaChange.updateColumnType(newField.name(), 
newField.type()));
-                    if (newField.description() != null) {
-                        result.add(
-                                SchemaChange.updateColumnComment(
-                                        new String[] {newField.name()}, 
newField.description()));
-                    }
-                }
-            } else {
-                result.add(
-                        SchemaChange.addColumn(
-                                newField.name(), newField.type(), 
newField.description(), null));
-            }
-        }
-        return result;
+        return getSchemaChanges(updatedDataFields, schemaManager);
     }
 
     private void applySchemaChange(
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
index 08db4c70d..2b5312172 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java
@@ -92,6 +92,11 @@ public class UpdatedDataFieldsProcessFunction extends 
ProcessFunction<List<DataF
     }
 
     private List<SchemaChange> extractSchemaChanges(List<DataField> 
updatedDataFields) {
+        return getSchemaChanges(updatedDataFields, schemaManager);
+    }
+
+    public static List<SchemaChange> getSchemaChanges(
+            List<DataField> updatedDataFields, SchemaManager schemaManager) {
         RowType oldRowType = schemaManager.latest().get().logicalRowType();
         Map<String, DataField> oldFields = new HashMap<>();
         for (DataField oldField : oldRowType.getFields()) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java
index 29090d608..4eb803c0a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java
@@ -41,6 +41,7 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedList;
+import java.util.Objects;
 import java.util.Queue;
 import java.util.Set;
 
@@ -89,7 +90,10 @@ public class FileStoreSourceSplitReader
             nextBatch = currentFirstBatch;
             currentFirstBatch = null;
         } else {
-            nextBatch = reachLimit() ? null : 
currentReader.recordReader().readBatch();
+            nextBatch =
+                    reachLimit()
+                            ? null
+                            : 
Objects.requireNonNull(currentReader).recordReader().readBatch();
         }
         if (nextBatch == null) {
             pool.recycler().recycle(iterator);
@@ -179,7 +183,8 @@ public class FileStoreSourceSplitReader
 
     private void seek(long toSkip) throws IOException {
         while (true) {
-            RecordIterator<InternalRow> nextBatch = 
currentReader.recordReader().readBatch();
+            RecordIterator<InternalRow> nextBatch =
+                    
Objects.requireNonNull(currentReader).recordReader().readBatch();
             if (nextBatch == null) {
                 throw new RuntimeException(
                         String.format(
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java
index af9f71f54..cdb313be3 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java
@@ -45,6 +45,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.TreeMap;
 import java.util.concurrent.ArrayBlockingQueue;
 
@@ -155,7 +156,7 @@ public class AlignedContinuousFileSplitEnumerator extends 
ContinuousFileSplitEnu
                 }
             }
             PlanWithNextSnapshotId pendingPlan = pendingPlans.poll();
-            addSplits(splitGenerator.createSplits(pendingPlan.plan()));
+            
addSplits(splitGenerator.createSplits(Objects.requireNonNull(pendingPlan).plan()));
             nextSnapshotId = pendingPlan.nextSnapshotId();
             assignSplits();
         }
@@ -219,6 +220,7 @@ public class AlignedContinuousFileSplitEnumerator extends 
ContinuousFileSplitEnu
         PlanWithNextSnapshotId nextPlan = pendingPlans.poll();
         if (nextPlan != null) {
             nextSnapshotId = nextPlan.nextSnapshotId();
+            Objects.requireNonNull(nextSnapshotId);
             TableScan.Plan plan = nextPlan.plan();
             if (plan.splits().isEmpty()) {
                 addSplits(
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/CheckpointEvent.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/CheckpointEvent.java
index eddce4f08..845fb8278 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/CheckpointEvent.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/CheckpointEvent.java
@@ -20,8 +20,6 @@ package org.apache.paimon.flink.source.align;
 
 import org.apache.flink.api.connector.source.SourceEvent;
 
-import javax.annotation.Nonnull;
-
 import java.util.Objects;
 
 /**
@@ -32,13 +30,12 @@ public class CheckpointEvent implements SourceEvent {
 
     private static final long serialVersionUID = 1L;
 
-    @Nonnull private final long checkpointId;
+    private final long checkpointId;
 
     public CheckpointEvent(long checkpointId) {
         this.checkpointId = checkpointId;
     }
 
-    @Nonnull
     public long getCheckpointId() {
         return checkpointId;
     }

Reply via email to