This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 486ee6f80 [FLINK-36956][transform] Append NOT NULL attribute for 
Primary Key columns
486ee6f80 is described below

commit 486ee6f805b316b53f89dfa2de83a89892f57b02
Author: yuxiqian <34335406+yuxiq...@users.noreply.github.com>
AuthorDate: Thu Jan 9 12:06:33 2025 +0800

    [FLINK-36956][transform] Append NOT NULL attribute for Primary Key columns
    
    This closes  #3815
---
 docs/content.zh/docs/core-concept/transform.md     |   2 +
 docs/content/docs/core-concept/transform.md        |   2 +
 .../flink/FlinkPipelineComposerITCase.java         |  12 +-
 .../flink/FlinkPipelineComposerLenientITCase.java  |  12 +-
 .../flink/FlinkPipelineTransformITCase.java        | 189 ++++++++++++++++-----
 .../cdc/composer/flink/FlinkPipelineUdfITCase.java |  24 +--
 .../operators/transform/PostTransformOperator.java |   2 +-
 .../transform/TransformProjectionProcessor.java    |  15 +-
 .../transform/PostTransformOperatorTest.java       |  26 +--
 .../TransformOperatorWithSchemaEvolveTest.java     |  12 +-
 .../transform/UnifiedTransformOperatorTest.java    |  35 ++--
 11 files changed, 225 insertions(+), 106 deletions(-)

diff --git a/docs/content.zh/docs/core-concept/transform.md 
b/docs/content.zh/docs/core-concept/transform.md
index 950b01833..892243628 100644
--- a/docs/content.zh/docs/core-concept/transform.md
+++ b/docs/content.zh/docs/core-concept/transform.md
@@ -263,6 +263,8 @@ transform:
     description: reassign composite primary keys example
 ```
 
+Notice that primary key columns will be attributed as NOT NULL in the 
downstream table, so you should ensure that no NULL value will be assigned to 
these columns.
+
 ## Reassign partition key
 We can reassign the partition key in transform rules. For example, given a 
table web_order in the database mydb, we may define a transform rule as follows:
 
diff --git a/docs/content/docs/core-concept/transform.md 
b/docs/content/docs/core-concept/transform.md
index 9a8cf5c61..b4979beca 100644
--- a/docs/content/docs/core-concept/transform.md
+++ b/docs/content/docs/core-concept/transform.md
@@ -263,6 +263,8 @@ transform:
     description: reassign composite primary keys example
 ```
 
+Notice that primary key columns will be attributed as NOT NULL in the 
downstream table, so you should ensure that no NULL value will be assigned to 
these columns.
+
 ## Reassign partition key
 We can reassign the partition key in transform rules. For example, given a 
table web_order in the database mydb, we may define a transform rule as follows:
 
diff --git 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
index 380baaa5d..c48f94402 100644
--- 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
+++ 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
@@ -350,7 +350,7 @@ class FlinkPipelineComposerITCase {
         String[] outputEvents = outCaptor.toString().trim().split("\n");
         assertThat(outputEvents)
                 .containsExactly(
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.table1, 
schema=columns={`col1` STRING,`col2` STRING,`col12` STRING}, primaryKeys=col1, 
partitionKeys=col12, options=({key1=value1})}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.table1, 
schema=columns={`col1` STRING NOT NULL,`col2` STRING,`col12` STRING}, 
primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], 
after=[1, 1, 10], op=INSERT, meta=({op_ts=1})}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], 
after=[2, 2, 20], op=INSERT, meta=({op_ts=2})}",
                         
"AddColumnEvent{tableId=default_namespace.default_schema.table1, 
addedColumns=[ColumnWithPosition{column=`col3` STRING, position=AFTER, 
existedColumnName=col2}]}",
@@ -413,7 +413,7 @@ class FlinkPipelineComposerITCase {
         String[] outputEvents = outCaptor.toString().trim().split("\n");
         assertThat(outputEvents)
                 .containsExactly(
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.table1, 
schema=columns={`col1` STRING,`col2` STRING,`col12` STRING,`rk` STRING NOT 
NULL,`opts` BIGINT NOT NULL}, primaryKeys=col1, partitionKeys=col12, 
options=({key1=value1})}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.table1, 
schema=columns={`col1` STRING NOT NULL,`col2` STRING,`col12` STRING,`rk` STRING 
NOT NULL,`opts` BIGINT NOT NULL}, primaryKeys=col1, partitionKeys=col12, 
options=({key1=value1})}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], 
after=[1, 1, 10, +I, 1], op=INSERT, meta=({op_ts=1})}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], 
after=[2, 2, 20, +I, 2], op=INSERT, meta=({op_ts=2})}",
                         
"AddColumnEvent{tableId=default_namespace.default_schema.table1, 
addedColumns=[ColumnWithPosition{column=`col3` STRING, position=AFTER, 
existedColumnName=col2}]}",
@@ -485,7 +485,7 @@ class FlinkPipelineComposerITCase {
         String[] outputEvents = outCaptor.toString().trim().split("\n");
         assertThat(outputEvents)
                 .containsExactly(
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.table1, 
schema=columns={`col1` STRING,`col2` STRING,`col12` STRING}, primaryKeys=col1, 
partitionKeys=col12, options=({key1=value1})}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.table1, 
schema=columns={`col1` STRING NOT NULL,`col2` STRING,`col12` STRING}, 
primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], 
after=[1, 1, 11], op=INSERT, meta=({op_ts=1})}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], 
after=[2, 2, 22], op=INSERT, meta=({op_ts=2})}",
                         
"AddColumnEvent{tableId=default_namespace.default_schema.table1, 
addedColumns=[ColumnWithPosition{column=`col3` STRING, position=AFTER, 
existedColumnName=col2}]}",
@@ -1024,7 +1024,7 @@ class FlinkPipelineComposerITCase {
         assertThat(mergedTableSchema)
                 .isEqualTo(
                         Schema.newBuilder()
-                                .physicalColumn("id", DataTypes.BIGINT())
+                                .physicalColumn("id", 
DataTypes.BIGINT().notNull())
                                 .physicalColumn("name", DataTypes.STRING())
                                 .physicalColumn("age", DataTypes.INT())
                                 .physicalColumn("last_name", 
DataTypes.STRING())
@@ -1035,12 +1035,12 @@ class FlinkPipelineComposerITCase {
         String[] outputEvents = outCaptor.toString().trim().split("\n");
         assertThat(outputEvents)
                 .containsExactly(
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.merged, 
schema=columns={`id` INT,`name` STRING,`age` INT,`last_name` STRING}, 
primaryKeys=id, options=()}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.merged, 
schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT,`last_name` STRING}, 
primaryKeys=id, options=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], 
after=[1, Alice, 18, last_name], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], 
after=[2, Bob, 20, last_name], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[2, 
Bob, 20, last_name], after=[2, Bob, 30, last_name], op=UPDATE, meta=()}",
                         
"AddColumnEvent{tableId=default_namespace.default_schema.merged, 
addedColumns=[ColumnWithPosition{column=`description` STRING, position=AFTER, 
existedColumnName=last_name}]}",
-                        
"AlterColumnTypeEvent{tableId=default_namespace.default_schema.merged, 
typeMapping={id=BIGINT}, oldTypeMapping={id=INT}}",
+                        
"AlterColumnTypeEvent{tableId=default_namespace.default_schema.merged, 
typeMapping={id=BIGINT NOT NULL}, oldTypeMapping={id=INT NOT NULL}}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], 
after=[3, Charlie, 15, last_name, student], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], 
after=[4, Donald, 25, last_name, student], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[4, 
Donald, 25, last_name, student], after=[], op=DELETE, meta=()}",
diff --git 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java
 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java
index 358094aa6..ad541f447 100644
--- 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java
+++ 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java
@@ -394,7 +394,7 @@ class FlinkPipelineComposerLenientITCase {
         String[] outputEvents = 
outCaptor.toString().trim().split(LINE_SEPARATOR);
         assertThat(outputEvents)
                 .containsExactly(
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.table1, 
schema=columns={`col1` STRING,`col2` STRING,`col12` STRING}, primaryKeys=col1, 
partitionKeys=col12, options=({key1=value1})}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.table1, 
schema=columns={`col1` STRING NOT NULL,`col2` STRING,`col12` STRING}, 
primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], 
after=[1, 1, 10], op=INSERT, meta=({op_ts=1})}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], 
after=[2, 2, 20], op=INSERT, meta=({op_ts=2})}",
                         
"AddColumnEvent{tableId=default_namespace.default_schema.table1, 
addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, 
existedColumnName=null}]}",
@@ -455,7 +455,7 @@ class FlinkPipelineComposerLenientITCase {
         String[] outputEvents = 
outCaptor.toString().trim().split(LINE_SEPARATOR);
         assertThat(outputEvents)
                 .containsExactly(
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.table1, 
schema=columns={`col1` STRING,`col2` STRING,`col12` STRING,`rk` STRING NOT 
NULL}, primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.table1, 
schema=columns={`col1` STRING NOT NULL,`col2` STRING,`col12` STRING,`rk` STRING 
NOT NULL}, primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], 
after=[1, 1, 10, +I], op=INSERT, meta=({op_ts=1})}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], 
after=[2, 2, 20, +I], op=INSERT, meta=({op_ts=2})}",
                         
"AddColumnEvent{tableId=default_namespace.default_schema.table1, 
addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, 
existedColumnName=null}]}",
@@ -525,7 +525,7 @@ class FlinkPipelineComposerLenientITCase {
         String[] outputEvents = 
outCaptor.toString().trim().split(LINE_SEPARATOR);
         assertThat(outputEvents)
                 .containsExactly(
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.table1, 
schema=columns={`col1` STRING,`col2` STRING,`col12` STRING}, primaryKeys=col1, 
partitionKeys=col12, options=({key1=value1})}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.table1, 
schema=columns={`col1` STRING NOT NULL,`col2` STRING,`col12` STRING}, 
primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], 
after=[1, 1, 11], op=INSERT, meta=({op_ts=1})}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], 
after=[2, 2, 22], op=INSERT, meta=({op_ts=2})}",
                         
"AddColumnEvent{tableId=default_namespace.default_schema.table1, 
addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, 
existedColumnName=null}]}",
@@ -1057,7 +1057,7 @@ class FlinkPipelineComposerLenientITCase {
         assertThat(mergedTableSchema)
                 .isEqualTo(
                         Schema.newBuilder()
-                                .physicalColumn("id", DataTypes.BIGINT())
+                                .physicalColumn("id", 
DataTypes.BIGINT().notNull())
                                 .physicalColumn("name", DataTypes.STRING())
                                 .physicalColumn("age", DataTypes.INT())
                                 .physicalColumn("last_name", 
DataTypes.STRING())
@@ -1068,12 +1068,12 @@ class FlinkPipelineComposerLenientITCase {
         String[] outputEvents = 
outCaptor.toString().trim().split(LINE_SEPARATOR);
         assertThat(outputEvents)
                 .containsExactly(
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.merged, 
schema=columns={`id` INT,`name` STRING,`age` INT,`last_name` STRING}, 
primaryKeys=id, options=()}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.merged, 
schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT,`last_name` STRING}, 
primaryKeys=id, options=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], 
after=[1, Alice, 18, last_name], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], 
after=[2, Bob, 20, last_name], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[2, 
Bob, 20, last_name], after=[2, Bob, 30, last_name], op=UPDATE, meta=()}",
                         
"AddColumnEvent{tableId=default_namespace.default_schema.merged, 
addedColumns=[ColumnWithPosition{column=`description` STRING, position=LAST, 
existedColumnName=null}]}",
-                        
"AlterColumnTypeEvent{tableId=default_namespace.default_schema.merged, 
typeMapping={id=BIGINT}, oldTypeMapping={id=INT}}",
+                        
"AlterColumnTypeEvent{tableId=default_namespace.default_schema.merged, 
typeMapping={id=BIGINT NOT NULL}, oldTypeMapping={id=INT NOT NULL}}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], 
after=[3, Charlie, 15, last_name, student], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[], 
after=[4, Donald, 25, last_name, student], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.merged, before=[4, 
Donald, 25, last_name, student], after=[], op=DELETE, meta=()}",
diff --git 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
index 6a775e74a..561097286 100644
--- 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
+++ 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
@@ -141,11 +141,11 @@ class FlinkPipelineTransformITCase {
                                 null,
                                 null)),
                 Arrays.asList(
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`id` INT,`name` STRING,`age` INT,`uid` STRING,`double_age` 
INT}, primaryKeys=id, options=()}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT,`uid` 
STRING,`double_age` INT}, primaryKeys=id, options=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[1, Alice, 18, 1Alice, 36], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[2, Bob, 20, 2Bob, 40], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, 
Bob, 20, 2Bob, 40], after=[2, Bob, 30, 2Bob, 60], op=UPDATE, meta=()}",
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, 
schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` 
STRING,`uid` STRING,`double_age` INT}, primaryKeys=id, options=()}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, 
schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255),`age` 
TINYINT,`description` STRING,`uid` STRING,`double_age` INT}, primaryKeys=id, 
options=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], 
after=[3, Carol, 15, student, 3Carol, 30], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], 
after=[4, Derrida, 25, student, 4Derrida, 50], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, 
Derrida, 25, student, 4Derrida, 50], after=[], op=DELETE, meta=()}"));
@@ -169,11 +169,11 @@ class FlinkPipelineTransformITCase {
                                 null,
                                 null)),
                 Arrays.asList(
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`id` INT,`name` STRING,`age` INT,`cubic_age` INT}, 
primaryKeys=id, options=()}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT,`cubic_age` INT}, 
primaryKeys=id, options=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[1, Alice, 18, 5832], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[2, Bob, 20, 8000], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, 
Bob, 20, 8000], after=[2, Bob, 30, 27000], op=UPDATE, meta=()}",
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, 
schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` 
STRING,`cubic_age` INT}, primaryKeys=id, options=()}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, 
schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255),`age` 
TINYINT,`description` STRING,`cubic_age` INT}, primaryKeys=id, options=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], 
after=[3, Carol, 15, student, 3375], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], 
after=[4, Derrida, 25, student, 15625], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, 
Derrida, 25, student, 15625], after=[], op=DELETE, meta=()}"));
@@ -255,11 +255,11 @@ class FlinkPipelineTransformITCase {
                                 null,
                                 null)),
                 Arrays.asList(
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`id` INT,`name` STRING,`age` INT,`category` STRING}, 
primaryKeys=id, options=()}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT,`category` STRING}, 
primaryKeys=id, options=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[1, Alice, 18, YOUNG], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[2, Bob, 20, OLD], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, 
Bob, 20, OLD], after=[2, Bob, 30, OLD], op=UPDATE, meta=()}",
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, 
schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` 
STRING,`category` STRING}, primaryKeys=id, options=()}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, 
schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255),`age` 
TINYINT,`description` STRING,`category` STRING}, primaryKeys=id, options=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], 
after=[3, Carol, 15, student, YOUNG], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], 
after=[4, Derrida, 25, student, OLD], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, 
Derrida, 25, student, OLD], after=[], op=DELETE, meta=()}"));
@@ -290,11 +290,11 @@ class FlinkPipelineTransformITCase {
                                 null,
                                 null)),
                 Arrays.asList(
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`id` INT,`age` INT,`roleName` STRING}, primaryKeys=id, 
options=()}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`id` INT NOT NULL,`age` INT,`roleName` STRING}, primaryKeys=id, 
options=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[1, 18, Alice], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[2, 20, Bob], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, 
20, Bob], after=[2, 30, Bob], op=UPDATE, meta=()}",
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, 
schema=columns={`id` BIGINT,`age` TINYINT,`roleName` STRING}, primaryKeys=id, 
options=()}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, 
schema=columns={`id` BIGINT NOT NULL,`age` TINYINT,`roleName` STRING}, 
primaryKeys=id, options=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], 
after=[3, 15, Juvenile], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], 
after=[4, 25, Derrida], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, 
25, Derrida], after=[], op=DELETE, meta=()}"));
@@ -329,7 +329,7 @@ class FlinkPipelineTransformITCase {
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[1, Alice, 18], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[2, Bob, 20], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, 
Bob, 20], after=[2, Bob, 30], op=UPDATE, meta=()}",
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, 
schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` 
STRING,`roleName` STRING}, primaryKeys=id, options=()}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, 
schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255),`age` 
TINYINT,`description` STRING,`roleName` STRING}, primaryKeys=id, options=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], 
after=[3, Carol, 15, student, Juvenile], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], 
after=[4, Derrida, 25, student, Derrida], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, 
Derrida, 25, student, Derrida], after=[], op=DELETE, meta=()}"));
@@ -364,7 +364,7 @@ class FlinkPipelineTransformITCase {
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[1, Alice, 18], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[2, Bob, 20], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, 
Bob, 20], after=[2, Bob, 30], op=UPDATE, meta=()}",
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, 
schema=columns={`id` BIGINT,`name` STRING,`age` TINYINT,`description` STRING}, 
primaryKeys=id, options=()}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, 
schema=columns={`id` BIGINT NOT NULL,`name` STRING,`age` TINYINT,`description` 
STRING}, primaryKeys=id, options=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], 
after=[3, Carol, 15, student], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], 
after=[4, DERRIDA, 25, student], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, 
DERRIDA, 25, student], after=[], op=DELETE, meta=()}"));
@@ -387,11 +387,11 @@ class FlinkPipelineTransformITCase {
                                 "Just a Transform Block",
                                 null)),
                 Arrays.asList(
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id;name, 
partitionKeys=id, options=({bucket=17, replication_num=1})}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`id` INT NOT NULL,`name` STRING NOT NULL,`age` INT}, 
primaryKeys=id;name, partitionKeys=id, options=({bucket=17, 
replication_num=1})}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[1, Alice, 18], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[2, Bob, 20], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, 
Bob, 20], after=[2, Bob, 30], op=UPDATE, meta=()}",
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, 
schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` 
STRING}, primaryKeys=id;name, partitionKeys=id, options=({bucket=17, 
replication_num=1})}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, 
schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255) NOT NULL,`age` 
TINYINT,`description` STRING}, primaryKeys=id;name, partitionKeys=id, 
options=({bucket=17, replication_num=1})}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], 
after=[3, Carol, 15, student], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], 
after=[4, Derrida, 25, student], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, 
Derrida, 25, student], after=[], op=DELETE, meta=()}"));
@@ -444,11 +444,11 @@ class FlinkPipelineTransformITCase {
                                 null,
                                 null)),
                 Arrays.asList(
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`id` INT,`name` STRING,`age` INT,`__namespace_name__` STRING 
NOT NULL,`__schema_name__` STRING NOT NULL,`__table_name__` STRING NOT NULL}, 
primaryKeys=id, options=()}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT,`__namespace_name__` 
STRING NOT NULL,`__schema_name__` STRING NOT NULL,`__table_name__` STRING NOT 
NULL}, primaryKeys=id, options=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[1, Alice, 18, default_namespace, default_schema, mytable1], op=INSERT, 
meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[2, Bob, 20, default_namespace, default_schema, mytable1], op=INSERT, 
meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, 
Bob, 20, default_namespace, default_schema, mytable1], after=[2, Bob, 30, 
default_namespace, default_schema, mytable1], op=UPDATE, meta=()}",
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, 
schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` 
TINYINT,`__namespace_name__` STRING NOT NULL,`__schema_name__` STRING NOT 
NULL,`__table_name__` STRING NOT NULL}, primaryKeys=id, options=()}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, 
schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255),`age` 
TINYINT,`__namespace_name__` STRING NOT NULL,`__schema_name__` STRING NOT 
NULL,`__table_name__` STRING NOT NULL}, primaryKeys=id, options=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], 
after=[3, Carol, 15, default_namespace, default_schema, mytable2], op=INSERT, 
meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], 
after=[4, Derrida, 25, default_namespace, default_schema, mytable2], op=INSERT, 
meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, 
Derrida, 25, default_namespace, default_schema, mytable2], after=[], op=DELETE, 
meta=()}"));
@@ -471,11 +471,11 @@ class FlinkPipelineTransformITCase {
                                 null,
                                 null)),
                 Arrays.asList(
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`id` INT,`name` STRING,`age` INT,`__namespace_name__` STRING 
NOT NULL,`__schema_name__` STRING NOT NULL,`__table_name__` STRING NOT NULL}, 
primaryKeys=id, options=()}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT,`__namespace_name__` 
STRING NOT NULL,`__schema_name__` STRING NOT NULL,`__table_name__` STRING NOT 
NULL}, primaryKeys=id, options=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[1, Alice, 18, default_namespace, default_schema, mytable1], op=INSERT, 
meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[2, Bob, 20, default_namespace, default_schema, mytable1], op=INSERT, 
meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, 
Bob, 20, default_namespace, default_schema, mytable1], after=[2, Bob, 30, 
default_namespace, default_schema, mytable1], op=UPDATE, meta=()}",
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, 
schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` 
STRING,`__namespace_name__` STRING NOT NULL,`__schema_name__` STRING NOT 
NULL,`__table_name__` STRING NOT NULL}, primaryKeys=id, options=()}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, 
schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255),`age` 
TINYINT,`description` STRING,`__namespace_name__` STRING NOT 
NULL,`__schema_name__` STRING NOT NULL,`__table_name__` STRING NOT NULL}, 
primaryKeys=id, options=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], 
after=[3, Carol, 15, student, default_namespace, default_schema, mytable2], 
op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], 
after=[4, Derrida, 25, student, default_namespace, default_schema, mytable2], 
op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, 
Derrida, 25, student, default_namespace, default_schema, mytable2], after=[], 
op=DELETE, meta=()}"));
@@ -502,11 +502,11 @@ class FlinkPipelineTransformITCase {
                                 null,
                                 null)),
                 Arrays.asList(
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`id` INT,`name` STRING,`age` INT,`string_literal` STRING}, 
primaryKeys=id, options=()}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT,`string_literal` 
STRING}, primaryKeys=id, options=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[1, Alice, 18, __namespace_name____schema_name____table_name__], 
op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[2, Bob, 20, __namespace_name____schema_name____table_name__], op=INSERT, 
meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, 
Bob, 20, __namespace_name____schema_name____table_name__], after=[2, Bob, 30, 
__namespace_name____schema_name____table_name__], op=UPDATE, meta=()}",
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, 
schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` 
STRING,`string_literal` STRING}, primaryKeys=id, options=()}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, 
schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255),`age` 
TINYINT,`description` STRING,`string_literal` STRING}, primaryKeys=id, 
options=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], 
after=[3, Carol, 15, student, __namespace_name____schema_name____table_name__], 
op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], 
after=[4, Derrida, 25, student, 
__namespace_name____schema_name____table_name__], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, 
Derrida, 25, student, __namespace_name____schema_name____table_name__], 
after=[], op=DELETE, meta=()}"));
@@ -529,11 +529,11 @@ class FlinkPipelineTransformITCase {
                                 null,
                                 "SOFT_DELETE")),
                 Arrays.asList(
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`id` INT,`name` STRING,`age` INT,`__namespace_name__` STRING 
NOT NULL,`__schema_name__` STRING NOT NULL,`__table_name__` STRING NOT 
NULL,`__data_event_type__` STRING NOT NULL}, primaryKeys=id, options=()}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT,`__namespace_name__` 
STRING NOT NULL,`__schema_name__` STRING NOT NULL,`__table_name__` STRING NOT 
NULL,`__data_event_type__` STRING NOT NULL}, primaryKeys=id, options=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[1, Alice, 18, default_namespace, default_schema, mytable1, +I], 
op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[2, Bob, 20, default_namespace, default_schema, mytable1, +I], op=INSERT, 
meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, 
Bob, 20, default_namespace, default_schema, mytable1, -U], after=[2, Bob, 30, 
default_namespace, default_schema, mytable1, +U], op=UPDATE, meta=()}",
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, 
schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` 
TINYINT,`__namespace_name__` STRING NOT NULL,`__schema_name__` STRING NOT 
NULL,`__table_name__` STRING NOT NULL,`__data_event_type__` STRING NOT NULL}, 
primaryKeys=id, options=()}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, 
schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255),`age` 
TINYINT,`__namespace_name__` STRING NOT NULL,`__schema_name__` STRING NOT 
NULL,`__table_name__` STRING NOT NULL,`__data_event_type__` STRING NOT NULL}, 
primaryKeys=id, options=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], 
after=[3, Carol, 15, default_namespace, default_schema, mytable2, +I], 
op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], 
after=[4, Derrida, 25, default_namespace, default_schema, mytable2, +I], 
op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], 
after=[4, Derrida, 25, default_namespace, default_schema, mytable2, -D], 
op=INSERT, meta=()}"));
@@ -562,11 +562,11 @@ class FlinkPipelineTransformITCase {
                                 null,
                                 null)),
                 Arrays.asList(
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`id` INT,`name` STRING,`age` INT,`col1` BOOLEAN,`col2` 
BOOLEAN,`col3` BOOLEAN,`col4` BOOLEAN,`col5` BOOLEAN,`col6` BOOLEAN,`col7` 
BOOLEAN,`col8` BOOLEAN,`col9` BOOLEAN,`col10` BOOLEAN,`col11` BOOLEAN,`col12` 
BOOLEAN,`col13` BOOLEAN,`col14` BOOLEAN}, primaryKeys=id, options=()}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT,`col1` BOOLEAN,`col2` 
BOOLEAN,`col3` BOOLEAN,`col4` BOOLEAN,`col5` BOOLEAN,`col6` BOOLEAN,`col7` 
BOOLEAN,`col8` BOOLEAN,`col9` BOOLEAN,`col10` BOOLEAN,`col11` BOOLEAN,`col12` 
BOOLEAN,`col13` BOOLEAN,`col14` BOOLEAN}, primaryKeys=id, options=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[1, Alice, 18, false, true, false, false, true, true, false, true, true, 
true, true, false, true, false], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[2, Bob, 20, true, true, false, true, true, true, false, true, true, 
false, false, false, false, true], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, 
Bob, 20, true, true, false, true, true, true, false, true, true, false, false, 
false, false, true], after=[2, Bob, 30, true, true, false, true, true, true, 
false, true, true, false, false, false, false, true], op=UPDATE, meta=()}",
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, 
schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` 
STRING,`col1` BOOLEAN,`col2` BOOLEAN,`col3` BOOLEAN,`col4` BOOLEAN,`col5` 
BOOLEAN,`col6` BOOLEAN,`col7` BOOLEAN,`col8` BOOLEAN,`col9` BOOLEAN,`col10` 
BOOLEAN,`col11` BOOLEAN,`col12` BOOLEAN,`col13` BOOLEAN,`col14` BOOLEAN}, 
primaryKeys=id, options=()}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, 
schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255),`age` 
TINYINT,`description` STRING,`col1` BOOLEAN,`col2` BOOLEAN,`col3` 
BOOLEAN,`col4` BOOLEAN,`col5` BOOLEAN,`col6` BOOLEAN,`col7` BOOLEAN,`col8` 
BOOLEAN,`col9` BOOLEAN,`col10` BOOLEAN,`col11` BOOLEAN,`col12` BOOLEAN,`col13` 
BOOLEAN,`col14` BOOLEAN}, primaryKeys=id, options=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], 
after=[3, Carol, 15, student, false, true, true, true, false, true, false, 
true, true, false, false, true, true, false], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], 
after=[4, Derrida, 25, student, false, true, true, true, false, true, false, 
true, false, false, false, false, false, true], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, 
Derrida, 25, student, false, true, true, true, false, true, false, true, false, 
false, false, false, false, true], after=[], op=DELETE, meta=()}"));
@@ -595,11 +595,11 @@ class FlinkPipelineTransformITCase {
                                 null,
                                 null)),
                 Arrays.asList(
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`id` INT,`name` STRING,`age` INT,`col1` BOOLEAN,`col2` 
BOOLEAN,`col4` BOOLEAN,`col5` BOOLEAN,`col6` BOOLEAN,`col7` BOOLEAN,`col8` 
BOOLEAN,`col9` BOOLEAN,`col10` BOOLEAN}, primaryKeys=id, options=()}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT,`col1` BOOLEAN,`col2` 
BOOLEAN,`col4` BOOLEAN,`col5` BOOLEAN,`col6` BOOLEAN,`col7` BOOLEAN,`col8` 
BOOLEAN,`col9` BOOLEAN,`col10` BOOLEAN}, primaryKeys=id, options=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[1, Alice, 18, true, true, true, false, false, true, false, true, false], 
op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[2, Bob, 20, true, true, false, false, true, true, false, true, false], 
op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, 
Bob, 20, true, true, false, false, true, true, false, true, false], after=[2, 
Bob, 30, true, true, false, false, true, true, false, true, false], op=UPDATE, 
meta=()}",
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, 
schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` 
STRING,`col1` BOOLEAN,`col2` BOOLEAN,`col4` BOOLEAN,`col5` BOOLEAN,`col6` 
BOOLEAN,`col7` BOOLEAN,`col8` BOOLEAN,`col9` BOOLEAN,`col10` BOOLEAN}, 
primaryKeys=id, options=()}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, 
schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255),`age` 
TINYINT,`description` STRING,`col1` BOOLEAN,`col2` BOOLEAN,`col4` 
BOOLEAN,`col5` BOOLEAN,`col6` BOOLEAN,`col7` BOOLEAN,`col8` BOOLEAN,`col9` 
BOOLEAN,`col10` BOOLEAN}, primaryKeys=id, options=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], 
after=[3, Carol, 15, student, true, true, false, false, true, true, false, 
false, false], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], 
after=[4, Derrida, 25, student, true, true, false, false, true, true, true, 
true, false], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, 
Derrida, 25, student, true, true, false, false, true, true, true, true, false], 
after=[], op=DELETE, meta=()}"));
@@ -629,11 +629,11 @@ class FlinkPipelineTransformITCase {
                                 null,
                                 null)),
                 Arrays.asList(
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`id` INT,`name` STRING,`age` INT,`col1` INT,`col2` INT,`col3` 
INT,`col4` DOUBLE,`col5` INT,`col6` INT,`col7` DOUBLE,`col8` DOUBLE,`col9` 
DOUBLE,`col10` INT}, primaryKeys=id, options=()}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT,`col1` INT,`col2` 
INT,`col3` INT,`col4` DOUBLE,`col5` INT,`col6` INT,`col7` DOUBLE,`col8` 
DOUBLE,`col9` DOUBLE,`col10` INT}, primaryKeys=id, options=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[1, Alice, 18, 18, -16, 17, 0.5882352941176471, 1, 16, 1.0, 0.0, 1.0, 
36], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[2, Bob, 20, 19, -15, 34, 1.1764705882352942, 2, 15, 2.0, 1.0, 1.0, 36], 
op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, 
Bob, 20, 19, -15, 34, 1.1764705882352942, 2, 15, 2.0, 1.0, 1.0, 36], after=[2, 
Bob, 30, 19, -15, 34, 1.1764705882352942, 2, 15, 2.0, 1.0, 1.0, 36], op=UPDATE, 
meta=()}",
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, 
schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` 
STRING,`col1` BIGINT,`col2` BIGINT,`col3` BIGINT,`col4` DOUBLE,`col5` 
INT,`col6` BIGINT,`col7` DOUBLE,`col8` DOUBLE,`col9` DOUBLE,`col10` INT}, 
primaryKeys=id, options=()}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, 
schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255),`age` 
TINYINT,`description` STRING,`col1` BIGINT,`col2` BIGINT,`col3` BIGINT,`col4` 
DOUBLE,`col5` INT,`col6` BIGINT,`col7` DOUBLE,`col8` DOUBLE,`col9` 
DOUBLE,`col10` INT}, primaryKeys=id, options=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], 
after=[3, Carol, 15, student, 20, -14, 51, 1.7647058823529411, 0, 14, 2.0, 1.0, 
2.0, 36], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], 
after=[4, Derrida, 25, student, 21, -13, 68, 2.3529411764705883, 1, 13, 3.0, 
2.0, 2.0, 36], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, 
Derrida, 25, student, 21, -13, 68, 2.3529411764705883, 1, 13, 3.0, 2.0, 2.0, 
36], after=[], op=DELETE, meta=()}"));
@@ -666,11 +666,11 @@ class FlinkPipelineTransformITCase {
                                 null,
                                 null)),
                 Arrays.asList(
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`id` INT,`name` STRING,`age` INT,`col1` STRING,`col2` 
INT,`col3` STRING,`col4` STRING,`col5` STRING,`col6` STRING,`col7` 
STRING,`col8` STRING,`col9` STRING,`col10` STRING}, primaryKeys=id, 
options=()}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT,`col1` STRING,`col2` 
INT,`col3` STRING,`col4` STRING,`col5` STRING,`col6` STRING,`col7` 
STRING,`col8` STRING,`col9` STRING,`col10` STRING}, primaryKeys=id, 
options=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[1, Alice, 18, Dear Alice, 5, ALICE, alice, Alice, **ice, A, l, ice, 
Alice - 1], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[2, Bob, 20, Dear Bob, 3, BOB, bob, Bob, **b, B, o, b, Bob - 2], 
op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, 
Bob, 20, Dear Bob, 3, BOB, bob, Bob, **b, B, o, b, Bob - 2], after=[2, Bob, 30, 
Dear Bob, 3, BOB, bob, Bob, **b, B, o, b, Bob - 2], op=UPDATE, meta=()}",
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, 
schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` 
STRING,`col1` STRING,`col2` INT,`col3` STRING,`col4` STRING,`col5` 
STRING,`col6` STRING,`col7` STRING,`col8` STRING,`col9` STRING,`col10` STRING}, 
primaryKeys=id, options=()}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, 
schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255),`age` 
TINYINT,`description` STRING,`col1` STRING,`col2` INT,`col3` STRING,`col4` 
STRING,`col5` STRING,`col6` STRING,`col7` STRING,`col8` STRING,`col9` 
STRING,`col10` STRING}, primaryKeys=id, options=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], 
after=[3, Carol, 15, student, Dear Carol, 5, CAROL, carol, Carol, Carol, C, a, 
rol, Carol - 3], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], 
after=[4, Derrida, 25, student, Dear Derrida, 7, DERRIDA, derrida, Derrida, 
Derrida, D, e, rrida, Derrida - 4], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, 
Derrida, 25, student, Dear Derrida, 7, DERRIDA, derrida, Derrida, Derrida, D, 
e, rrida, Derrida - 4], after=[], op=DELETE, meta=()}"));
@@ -1078,7 +1078,7 @@ class FlinkPipelineTransformITCase {
         assertThat(outputEvents)
                 .containsExactly(
                         // Initial stage
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id, options=()}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT}, primaryKeys=id, 
options=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[1, Alice, 21], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[2, Barcarolle, 22], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[3, Cecily, 23], op=INSERT, meta=()}",
@@ -1172,7 +1172,7 @@ class FlinkPipelineTransformITCase {
 
         assertThat(outputEvents)
                 .containsExactly(
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`id` INT,`name` STRING,`extend_id` STRING}, primaryKeys=id, 
options=()}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`id` INT NOT NULL,`name` STRING,`extend_id` STRING}, 
primaryKeys=id, options=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[1, Alice, 1 -> Alice], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[2, Barcarolle, 2 -> Barcarolle], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[3, Cecily, 3 -> Cecily], op=INSERT, meta=()}",
@@ -1256,7 +1256,7 @@ class FlinkPipelineTransformITCase {
         assertThat(outputEvents)
                 .containsExactly(
                         // Initial stage
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`id` INT,`name` STRING,`age` INT,`extend_id` STRING}, 
primaryKeys=id, options=()}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT,`extend_id` STRING}, 
primaryKeys=id, options=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[1, Alice, 21, 1 -> Alice], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[2, Barcarolle, 22, 2 -> Barcarolle], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[3, Cecily, 23, 3 -> Cecily], op=INSERT, meta=()}",
@@ -1351,7 +1351,7 @@ class FlinkPipelineTransformITCase {
         assertThat(outputEvents)
                 .containsExactly(
                         // Initial stage
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`extend_id` STRING,`id` INT,`name` STRING,`age` INT}, 
primaryKeys=id, options=()}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`extend_id` STRING,`id` INT NOT NULL,`name` STRING,`age` INT}, 
primaryKeys=id, options=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[1 -> Alice, 1, Alice, 21], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[2 -> Barcarolle, 2, Barcarolle, 22], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[3 -> Cecily, 3, Cecily, 23], op=INSERT, meta=()}",
@@ -1582,6 +1582,101 @@ class FlinkPipelineTransformITCase {
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, 
before=[12th, 15, Oops], after=[], op=DELETE, meta=()}");
     }
 
+    @Test
+    void testExplicitPrimaryKeyWithNullable() throws Exception {
+        FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
+
+        // Setup value source
+        Configuration sourceConfig = new Configuration();
+        sourceConfig.set(
+                ValuesDataSourceOptions.EVENT_SET_ID,
+                ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS);
+
+        TableId tableId = TableId.tableId("default_namespace", 
"default_schema", "mytable1");
+        List<Event> events = generateSchemaEvolutionEvents(tableId);
+
+        
ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events));
+
+        SourceDef sourceDef =
+                new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", 
sourceConfig);
+
+        // Setup value sink
+        Configuration sinkConfig = new Configuration();
+        SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value 
Sink", sinkConfig);
+
+        // Setup pipeline
+        Configuration pipelineConfig = new Configuration();
+        pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+        pipelineConfig.set(
+                PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, 
SchemaChangeBehavior.EVOLVE);
+        PipelineDef pipelineDef =
+                new PipelineDef(
+                        sourceDef,
+                        sinkDef,
+                        Collections.emptyList(),
+                        Collections.singletonList(
+                                new TransformDef(
+                                        
"default_namespace.default_schema.mytable1",
+                                        null,
+                                        null,
+                                        "name",
+                                        "id,name",
+                                        null,
+                                        null,
+                                        null)),
+                        Collections.emptyList(),
+                        pipelineConfig);
+
+        // Execute the pipeline
+        PipelineExecution execution = composer.compose(pipelineDef);
+        execution.execute();
+
+        // Check the order and content of all received events
+        String[] outputEvents = outCaptor.toString().trim().split("\n");
+
+        assertThat(outputEvents)
+                .containsExactly(
+                        // Initial stage
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=name, 
partitionKeys=id;name, options=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[1, Alice, 21], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[2, Barcarolle, 22], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[3, Cecily, 23], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[3, 
Cecily, 23], after=[3, Colin, 24], op=UPDATE, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, 
Barcarolle, 22], after=[], op=DELETE, meta=()}",
+
+                        // Add column stage
+                        
"AddColumnEvent{tableId=default_namespace.default_schema.mytable1, 
addedColumns=[ColumnWithPosition{column=`rank` STRING, position=BEFORE, 
existedColumnName=id}, ColumnWithPosition{column=`gender` TINYINT, 
position=AFTER, existedColumnName=age}]}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[1st, 4, Derrida, 24, 0], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[2nd, 5, Eve, 25, 1], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, 
before=[2nd, 5, Eve, 25, 1], after=[2nd, 5, Eva, 20, 2], op=UPDATE, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[3rd, 6, Fiona, 26, 3], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, 
before=[3rd, 6, Fiona, 26, 3], after=[], op=DELETE, meta=()}",
+
+                        // Alter column type stage
+                        
"AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, 
typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, 
oldTypeMapping={gender=TINYINT, name=STRING, age=INT}}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[4th, 7, Gem, 19.0, -1], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[5th, 8, Helen, 18.0, -2], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, 
before=[5th, 8, Helen, 18.0, -2], after=[5th, 8, Harry, 18.0, -3], op=UPDATE, 
meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[6th, 9, IINA, 17.0, 0], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, 
before=[6th, 9, IINA, 17.0, 0], after=[], op=DELETE, meta=()}",
+
+                        // Rename column stage
+                        
"RenameColumnEvent{tableId=default_namespace.default_schema.mytable1, 
nameMapping={gender=biological_sex, age=toshi}}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[7th, 10, Julia, 24.0, 1], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[8th, 11, Kalle, 23.0, 0], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, 
before=[8th, 11, Kalle, 23.0, 0], after=[8th, 11, Kella, 18.0, 0], op=UPDATE, 
meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[9th, 12, Lynx, 17.0, 0], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, 
before=[9th, 12, Lynx, 17.0, 0], after=[], op=DELETE, meta=()}",
+
+                        // Drop column stage
+                        
"DropColumnEvent{tableId=default_namespace.default_schema.mytable1, 
droppedColumnNames=[biological_sex, toshi]}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[10th, 13, Munroe], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[11th, 14, Neko], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, 
before=[11th, 14, Neko], after=[11th, 14, Nein], op=UPDATE, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[12th, 15, Oops], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, 
before=[12th, 15, Oops], after=[], op=DELETE, meta=()}");
+    }
+
     @Test
     void testTransformWithCommentsAndDefaultExpr() throws Exception {
         FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
@@ -1660,11 +1755,11 @@ class FlinkPipelineTransformITCase {
 
         Assertions.assertThat(outputEvents)
                 .containsExactly(
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`id` INT 'id column' 'AUTO_INCREMENT()',`name` STRING 'name 
column' 'Jane Doe',`age` INT 'age column' '17',`new_name` STRING 'name column' 
'Jane Doe',`new_age` INT,`extras` STRING}, primaryKeys=id, partitionKeys=id, 
age, options=()}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`id` INT NOT NULL 'id column' 'AUTO_INCREMENT()',`name` STRING 
'name column' 'Jane Doe',`age` INT 'age column' '17',`new_name` STRING 'name 
column' 'Jane Doe',`new_age` INT,`extras` STRING}, primaryKeys=id, 
partitionKeys=id, age, options=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[1, Alice, 18, Alice, 19, extras], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[2, Bob, 20, Bob, 21, extras], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, 
Bob, 20, Bob, 21, extras], after=[2, Bob, 30, Bob, 31, extras], op=UPDATE, 
meta=()}",
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, 
schema=columns={`id` BIGINT 'column for id' 'AUTO_DECREMENT()',`name` 
VARCHAR(255) 'column for name' 'John Smith',`age` TINYINT 'column for age' 
'91',`description` STRING 'column for descriptions' 'not important',`new_name` 
VARCHAR(255) 'column for name' 'John Smith',`new_age` INT,`extras` STRING}, 
primaryKeys=id, partitionKeys=id, name, options=()}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, 
schema=columns={`id` BIGINT NOT NULL 'column for id' 'AUTO_DECREMENT()',`name` 
VARCHAR(255) 'column for name' 'John Smith',`age` TINYINT 'column for age' 
'91',`description` STRING 'column for descriptions' 'not important',`new_name` 
VARCHAR(255) 'column for name' 'John Smith',`new_age` INT,`extras` STRING}, 
primaryKeys=id, partitionKeys=id, name, options=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], 
after=[3, Carol, 15, student, Carol, 16, extras], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], 
after=[4, Derrida, 25, student, Derrida, 26, extras], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, 
Derrida, 25, student, Derrida, 26, extras], after=[], op=DELETE, meta=()}");
@@ -1749,7 +1844,7 @@ class FlinkPipelineTransformITCase {
     void testNumericCastingsWithTruncation() throws Exception {
         assertThat(runNumericCastingWith("*"))
                 .containsExactly(
-                        "CreateTableEvent{tableId=ns.scm.tbl, 
schema=columns={`id` BIGINT,`tiny_c` TINYINT,`small_c` SMALLINT,`int_c` 
INT,`bigint_c` BIGINT,`float_c` FLOAT,`double_c` DOUBLE,`decimal_c` DECIMAL(10, 
2),`valid_char_c` VARCHAR(17),`invalid_char_c` VARCHAR(17)}, primaryKeys=id, 
options=()}",
+                        "CreateTableEvent{tableId=ns.scm.tbl, 
schema=columns={`id` BIGINT NOT NULL,`tiny_c` TINYINT,`small_c` 
SMALLINT,`int_c` INT,`bigint_c` BIGINT,`float_c` FLOAT,`double_c` 
DOUBLE,`decimal_c` DECIMAL(10, 2),`valid_char_c` VARCHAR(17),`invalid_char_c` 
VARCHAR(17)}, primaryKeys=id, options=()}",
                         "DataChangeEvent{tableId=ns.scm.tbl, before=[], 
after=[-1, -2, -3, -4, -5, -6.7, -8.9, -10.11, -12.13, foo], op=INSERT, 
meta=()}",
                         "DataChangeEvent{tableId=ns.scm.tbl, before=[], 
after=[0, 0, 0, 0, 0, 0.0, 0.0, 0.00, 0, bar], op=INSERT, meta=()}",
                         "DataChangeEvent{tableId=ns.scm.tbl, before=[], 
after=[1, 2, 3, 4, 5, 6.7, 8.9, 10.11, 12.13, baz], op=INSERT, meta=()}",
@@ -1757,7 +1852,7 @@ class FlinkPipelineTransformITCase {
 
         assertThat(runNumericCastingWith(generateCastTo("BOOLEAN")))
                 .containsExactly(
-                        "CreateTableEvent{tableId=ns.scm.tbl, 
schema=columns={`id` BIGINT,`tiny_c` BOOLEAN,`small_c` BOOLEAN,`int_c` 
BOOLEAN,`bigint_c` BOOLEAN,`float_c` BOOLEAN,`double_c` BOOLEAN,`decimal_c` 
BOOLEAN,`valid_char_c` BOOLEAN,`invalid_char_c` BOOLEAN}, primaryKeys=id, 
options=()}",
+                        "CreateTableEvent{tableId=ns.scm.tbl, 
schema=columns={`id` BIGINT NOT NULL,`tiny_c` BOOLEAN,`small_c` BOOLEAN,`int_c` 
BOOLEAN,`bigint_c` BOOLEAN,`float_c` BOOLEAN,`double_c` BOOLEAN,`decimal_c` 
BOOLEAN,`valid_char_c` BOOLEAN,`invalid_char_c` BOOLEAN}, primaryKeys=id, 
options=()}",
                         "DataChangeEvent{tableId=ns.scm.tbl, before=[], 
after=[-1, true, true, true, true, true, true, true, false, false], op=INSERT, 
meta=()}",
                         "DataChangeEvent{tableId=ns.scm.tbl, before=[], 
after=[0, false, false, false, false, false, false, false, false, false], 
op=INSERT, meta=()}",
                         "DataChangeEvent{tableId=ns.scm.tbl, before=[], 
after=[1, true, true, true, true, true, true, true, false, false], op=INSERT, 
meta=()}",
@@ -1765,7 +1860,7 @@ class FlinkPipelineTransformITCase {
 
         assertThat(runNumericCastingWith(generateCastTo("TINYINT")))
                 .containsExactly(
-                        "CreateTableEvent{tableId=ns.scm.tbl, 
schema=columns={`id` BIGINT,`tiny_c` TINYINT,`small_c` TINYINT,`int_c` 
TINYINT,`bigint_c` TINYINT,`float_c` TINYINT,`double_c` TINYINT,`decimal_c` 
TINYINT,`valid_char_c` TINYINT,`invalid_char_c` TINYINT}, primaryKeys=id, 
options=()}",
+                        "CreateTableEvent{tableId=ns.scm.tbl, 
schema=columns={`id` BIGINT NOT NULL,`tiny_c` TINYINT,`small_c` TINYINT,`int_c` 
TINYINT,`bigint_c` TINYINT,`float_c` TINYINT,`double_c` TINYINT,`decimal_c` 
TINYINT,`valid_char_c` TINYINT,`invalid_char_c` TINYINT}, primaryKeys=id, 
options=()}",
                         "DataChangeEvent{tableId=ns.scm.tbl, before=[], 
after=[-1, -2, -3, -4, -5, -6, -8, -10, -12, null], op=INSERT, meta=()}",
                         "DataChangeEvent{tableId=ns.scm.tbl, before=[], 
after=[0, 0, 0, 0, 0, 0, 0, 0, 0, null], op=INSERT, meta=()}",
                         "DataChangeEvent{tableId=ns.scm.tbl, before=[], 
after=[1, 2, 3, 4, 5, 6, 8, 10, 12, null], op=INSERT, meta=()}",
@@ -1773,7 +1868,7 @@ class FlinkPipelineTransformITCase {
 
         assertThat(runNumericCastingWith(generateCastTo("SMALLINT")))
                 .containsExactly(
-                        "CreateTableEvent{tableId=ns.scm.tbl, 
schema=columns={`id` BIGINT,`tiny_c` SMALLINT,`small_c` SMALLINT,`int_c` 
SMALLINT,`bigint_c` SMALLINT,`float_c` SMALLINT,`double_c` SMALLINT,`decimal_c` 
SMALLINT,`valid_char_c` SMALLINT,`invalid_char_c` SMALLINT}, primaryKeys=id, 
options=()}",
+                        "CreateTableEvent{tableId=ns.scm.tbl, 
schema=columns={`id` BIGINT NOT NULL,`tiny_c` SMALLINT,`small_c` 
SMALLINT,`int_c` SMALLINT,`bigint_c` SMALLINT,`float_c` SMALLINT,`double_c` 
SMALLINT,`decimal_c` SMALLINT,`valid_char_c` SMALLINT,`invalid_char_c` 
SMALLINT}, primaryKeys=id, options=()}",
                         "DataChangeEvent{tableId=ns.scm.tbl, before=[], 
after=[-1, -2, -3, -4, -5, -6, -8, -10, -12, null], op=INSERT, meta=()}",
                         "DataChangeEvent{tableId=ns.scm.tbl, before=[], 
after=[0, 0, 0, 0, 0, 0, 0, 0, 0, null], op=INSERT, meta=()}",
                         "DataChangeEvent{tableId=ns.scm.tbl, before=[], 
after=[1, 2, 3, 4, 5, 6, 8, 10, 12, null], op=INSERT, meta=()}",
@@ -1781,7 +1876,7 @@ class FlinkPipelineTransformITCase {
 
         assertThat(runNumericCastingWith(generateCastTo("INT")))
                 .containsExactly(
-                        "CreateTableEvent{tableId=ns.scm.tbl, 
schema=columns={`id` BIGINT,`tiny_c` INT,`small_c` INT,`int_c` INT,`bigint_c` 
INT,`float_c` INT,`double_c` INT,`decimal_c` INT,`valid_char_c` 
INT,`invalid_char_c` INT}, primaryKeys=id, options=()}",
+                        "CreateTableEvent{tableId=ns.scm.tbl, 
schema=columns={`id` BIGINT NOT NULL,`tiny_c` INT,`small_c` INT,`int_c` 
INT,`bigint_c` INT,`float_c` INT,`double_c` INT,`decimal_c` INT,`valid_char_c` 
INT,`invalid_char_c` INT}, primaryKeys=id, options=()}",
                         "DataChangeEvent{tableId=ns.scm.tbl, before=[], 
after=[-1, -2, -3, -4, -5, -6, -8, -10, -12, null], op=INSERT, meta=()}",
                         "DataChangeEvent{tableId=ns.scm.tbl, before=[], 
after=[0, 0, 0, 0, 0, 0, 0, 0, 0, null], op=INSERT, meta=()}",
                         "DataChangeEvent{tableId=ns.scm.tbl, before=[], 
after=[1, 2, 3, 4, 5, 6, 8, 10, 12, null], op=INSERT, meta=()}",
@@ -1789,7 +1884,7 @@ class FlinkPipelineTransformITCase {
 
         assertThat(runNumericCastingWith(generateCastTo("BIGINT")))
                 .containsExactly(
-                        "CreateTableEvent{tableId=ns.scm.tbl, 
schema=columns={`id` BIGINT,`tiny_c` BIGINT,`small_c` BIGINT,`int_c` 
BIGINT,`bigint_c` BIGINT,`float_c` BIGINT,`double_c` BIGINT,`decimal_c` 
BIGINT,`valid_char_c` BIGINT,`invalid_char_c` BIGINT}, primaryKeys=id, 
options=()}",
+                        "CreateTableEvent{tableId=ns.scm.tbl, 
schema=columns={`id` BIGINT NOT NULL,`tiny_c` BIGINT,`small_c` BIGINT,`int_c` 
BIGINT,`bigint_c` BIGINT,`float_c` BIGINT,`double_c` BIGINT,`decimal_c` 
BIGINT,`valid_char_c` BIGINT,`invalid_char_c` BIGINT}, primaryKeys=id, 
options=()}",
                         "DataChangeEvent{tableId=ns.scm.tbl, before=[], 
after=[-1, -2, -3, -4, -5, -6, -8, -10, -12, null], op=INSERT, meta=()}",
                         "DataChangeEvent{tableId=ns.scm.tbl, before=[], 
after=[0, 0, 0, 0, 0, 0, 0, 0, 0, null], op=INSERT, meta=()}",
                         "DataChangeEvent{tableId=ns.scm.tbl, before=[], 
after=[1, 2, 3, 4, 5, 6, 8, 10, 12, null], op=INSERT, meta=()}",
@@ -1797,7 +1892,7 @@ class FlinkPipelineTransformITCase {
 
         assertThat(runNumericCastingWith(generateCastTo("FLOAT")))
                 .containsExactly(
-                        "CreateTableEvent{tableId=ns.scm.tbl, 
schema=columns={`id` BIGINT,`tiny_c` FLOAT,`small_c` FLOAT,`int_c` 
FLOAT,`bigint_c` FLOAT,`float_c` FLOAT,`double_c` FLOAT,`decimal_c` 
FLOAT,`valid_char_c` FLOAT,`invalid_char_c` FLOAT}, primaryKeys=id, 
options=()}",
+                        "CreateTableEvent{tableId=ns.scm.tbl, 
schema=columns={`id` BIGINT NOT NULL,`tiny_c` FLOAT,`small_c` FLOAT,`int_c` 
FLOAT,`bigint_c` FLOAT,`float_c` FLOAT,`double_c` FLOAT,`decimal_c` 
FLOAT,`valid_char_c` FLOAT,`invalid_char_c` FLOAT}, primaryKeys=id, 
options=()}",
                         "DataChangeEvent{tableId=ns.scm.tbl, before=[], 
after=[-1, -2.0, -3.0, -4.0, -5.0, -6.7, -8.9, -10.11, -12.13, null], 
op=INSERT, meta=()}",
                         "DataChangeEvent{tableId=ns.scm.tbl, before=[], 
after=[0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, null], op=INSERT, meta=()}",
                         "DataChangeEvent{tableId=ns.scm.tbl, before=[], 
after=[1, 2.0, 3.0, 4.0, 5.0, 6.7, 8.9, 10.11, 12.13, null], op=INSERT, 
meta=()}",
@@ -1805,7 +1900,7 @@ class FlinkPipelineTransformITCase {
 
         assertThat(runNumericCastingWith(generateCastTo("DOUBLE")))
                 .containsExactly(
-                        "CreateTableEvent{tableId=ns.scm.tbl, 
schema=columns={`id` BIGINT,`tiny_c` DOUBLE,`small_c` DOUBLE,`int_c` 
DOUBLE,`bigint_c` DOUBLE,`float_c` DOUBLE,`double_c` DOUBLE,`decimal_c` 
DOUBLE,`valid_char_c` DOUBLE,`invalid_char_c` DOUBLE}, primaryKeys=id, 
options=()}",
+                        "CreateTableEvent{tableId=ns.scm.tbl, 
schema=columns={`id` BIGINT NOT NULL,`tiny_c` DOUBLE,`small_c` DOUBLE,`int_c` 
DOUBLE,`bigint_c` DOUBLE,`float_c` DOUBLE,`double_c` DOUBLE,`decimal_c` 
DOUBLE,`valid_char_c` DOUBLE,`invalid_char_c` DOUBLE}, primaryKeys=id, 
options=()}",
                         "DataChangeEvent{tableId=ns.scm.tbl, before=[], 
after=[-1, -2.0, -3.0, -4.0, -5.0, -6.699999809265137, -8.9, -10.11, -12.13, 
null], op=INSERT, meta=()}",
                         "DataChangeEvent{tableId=ns.scm.tbl, before=[], 
after=[0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, null], op=INSERT, meta=()}",
                         "DataChangeEvent{tableId=ns.scm.tbl, before=[], 
after=[1, 2.0, 3.0, 4.0, 5.0, 6.699999809265137, 8.9, 10.11, 12.13, null], 
op=INSERT, meta=()}",
@@ -1813,7 +1908,7 @@ class FlinkPipelineTransformITCase {
 
         assertThat(runNumericCastingWith(generateCastTo("DECIMAL(1, 0)")))
                 .containsExactly(
-                        "CreateTableEvent{tableId=ns.scm.tbl, 
schema=columns={`id` BIGINT,`tiny_c` DECIMAL(1, 0),`small_c` DECIMAL(1, 
0),`int_c` DECIMAL(1, 0),`bigint_c` DECIMAL(1, 0),`float_c` DECIMAL(1, 
0),`double_c` DECIMAL(1, 0),`decimal_c` DECIMAL(1, 0),`valid_char_c` DECIMAL(1, 
0),`invalid_char_c` DECIMAL(1, 0)}, primaryKeys=id, options=()}",
+                        "CreateTableEvent{tableId=ns.scm.tbl, 
schema=columns={`id` BIGINT NOT NULL,`tiny_c` DECIMAL(1, 0),`small_c` 
DECIMAL(1, 0),`int_c` DECIMAL(1, 0),`bigint_c` DECIMAL(1, 0),`float_c` 
DECIMAL(1, 0),`double_c` DECIMAL(1, 0),`decimal_c` DECIMAL(1, 0),`valid_char_c` 
DECIMAL(1, 0),`invalid_char_c` DECIMAL(1, 0)}, primaryKeys=id, options=()}",
                         "DataChangeEvent{tableId=ns.scm.tbl, before=[], 
after=[-1, -2, -3, -4, -5, -7, -9, null, null, null], op=INSERT, meta=()}",
                         "DataChangeEvent{tableId=ns.scm.tbl, before=[], 
after=[0, 0, 0, 0, 0, 0, 0, 0, 0, null], op=INSERT, meta=()}",
                         "DataChangeEvent{tableId=ns.scm.tbl, before=[], 
after=[1, 2, 3, 4, 5, 7, 9, null, null, null], op=INSERT, meta=()}",
@@ -1821,7 +1916,7 @@ class FlinkPipelineTransformITCase {
 
         assertThat(runNumericCastingWith(generateCastTo("DECIMAL(2, 0)")))
                 .containsExactly(
-                        "CreateTableEvent{tableId=ns.scm.tbl, 
schema=columns={`id` BIGINT,`tiny_c` DECIMAL(2, 0),`small_c` DECIMAL(2, 
0),`int_c` DECIMAL(2, 0),`bigint_c` DECIMAL(2, 0),`float_c` DECIMAL(2, 
0),`double_c` DECIMAL(2, 0),`decimal_c` DECIMAL(2, 0),`valid_char_c` DECIMAL(2, 
0),`invalid_char_c` DECIMAL(2, 0)}, primaryKeys=id, options=()}",
+                        "CreateTableEvent{tableId=ns.scm.tbl, 
schema=columns={`id` BIGINT NOT NULL,`tiny_c` DECIMAL(2, 0),`small_c` 
DECIMAL(2, 0),`int_c` DECIMAL(2, 0),`bigint_c` DECIMAL(2, 0),`float_c` 
DECIMAL(2, 0),`double_c` DECIMAL(2, 0),`decimal_c` DECIMAL(2, 0),`valid_char_c` 
DECIMAL(2, 0),`invalid_char_c` DECIMAL(2, 0)}, primaryKeys=id, options=()}",
                         "DataChangeEvent{tableId=ns.scm.tbl, before=[], 
after=[-1, -2, -3, -4, -5, -7, -9, -10, -12, null], op=INSERT, meta=()}",
                         "DataChangeEvent{tableId=ns.scm.tbl, before=[], 
after=[0, 0, 0, 0, 0, 0, 0, 0, 0, null], op=INSERT, meta=()}",
                         "DataChangeEvent{tableId=ns.scm.tbl, before=[], 
after=[1, 2, 3, 4, 5, 7, 9, 10, 12, null], op=INSERT, meta=()}",
@@ -1829,7 +1924,7 @@ class FlinkPipelineTransformITCase {
 
         assertThat(runNumericCastingWith(generateCastTo("DECIMAL(3, 1)")))
                 .containsExactly(
-                        "CreateTableEvent{tableId=ns.scm.tbl, 
schema=columns={`id` BIGINT,`tiny_c` DECIMAL(3, 1),`small_c` DECIMAL(3, 
1),`int_c` DECIMAL(3, 1),`bigint_c` DECIMAL(3, 1),`float_c` DECIMAL(3, 
1),`double_c` DECIMAL(3, 1),`decimal_c` DECIMAL(3, 1),`valid_char_c` DECIMAL(3, 
1),`invalid_char_c` DECIMAL(3, 1)}, primaryKeys=id, options=()}",
+                        "CreateTableEvent{tableId=ns.scm.tbl, 
schema=columns={`id` BIGINT NOT NULL,`tiny_c` DECIMAL(3, 1),`small_c` 
DECIMAL(3, 1),`int_c` DECIMAL(3, 1),`bigint_c` DECIMAL(3, 1),`float_c` 
DECIMAL(3, 1),`double_c` DECIMAL(3, 1),`decimal_c` DECIMAL(3, 1),`valid_char_c` 
DECIMAL(3, 1),`invalid_char_c` DECIMAL(3, 1)}, primaryKeys=id, options=()}",
                         "DataChangeEvent{tableId=ns.scm.tbl, before=[], 
after=[-1, -2.0, -3.0, -4.0, -5.0, -6.7, -8.9, -10.1, -12.1, null], op=INSERT, 
meta=()}",
                         "DataChangeEvent{tableId=ns.scm.tbl, before=[], 
after=[0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, null], op=INSERT, meta=()}",
                         "DataChangeEvent{tableId=ns.scm.tbl, before=[], 
after=[1, 2.0, 3.0, 4.0, 5.0, 6.7, 8.9, 10.1, 12.1, null], op=INSERT, meta=()}",
@@ -1837,7 +1932,7 @@ class FlinkPipelineTransformITCase {
 
         assertThat(runNumericCastingWith(generateCastTo("DECIMAL(19, 10)")))
                 .containsExactly(
-                        "CreateTableEvent{tableId=ns.scm.tbl, 
schema=columns={`id` BIGINT,`tiny_c` DECIMAL(19, 10),`small_c` DECIMAL(19, 
10),`int_c` DECIMAL(19, 10),`bigint_c` DECIMAL(19, 10),`float_c` DECIMAL(19, 
10),`double_c` DECIMAL(19, 10),`decimal_c` DECIMAL(19, 10),`valid_char_c` 
DECIMAL(19, 10),`invalid_char_c` DECIMAL(19, 10)}, primaryKeys=id, options=()}",
+                        "CreateTableEvent{tableId=ns.scm.tbl, 
schema=columns={`id` BIGINT NOT NULL,`tiny_c` DECIMAL(19, 10),`small_c` 
DECIMAL(19, 10),`int_c` DECIMAL(19, 10),`bigint_c` DECIMAL(19, 10),`float_c` 
DECIMAL(19, 10),`double_c` DECIMAL(19, 10),`decimal_c` DECIMAL(19, 
10),`valid_char_c` DECIMAL(19, 10),`invalid_char_c` DECIMAL(19, 10)}, 
primaryKeys=id, options=()}",
                         "DataChangeEvent{tableId=ns.scm.tbl, before=[], 
after=[-1, -2.0000000000, -3.0000000000, -4.0000000000, -5.0000000000, 
-6.7000000000, -8.9000000000, -10.1100000000, -12.1300000000, null], op=INSERT, 
meta=()}",
                         "DataChangeEvent{tableId=ns.scm.tbl, before=[], 
after=[0, 0.0000000000, 0.0000000000, 0.0000000000, 0.0000000000, 0.0000000000, 
0.0000000000, 0.0000000000, 0.0000000000, null], op=INSERT, meta=()}",
                         "DataChangeEvent{tableId=ns.scm.tbl, before=[], 
after=[1, 2.0000000000, 3.0000000000, 4.0000000000, 5.0000000000, 6.7000000000, 
8.9000000000, 10.1100000000, 12.1300000000, null], op=INSERT, meta=()}",
@@ -1904,7 +1999,7 @@ class FlinkPipelineTransformITCase {
         assertThat(outputEvents)
                 .containsExactly(
                         // Initial stage
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`id` INT,`name` STRING,`age` INT,`int_max` 
INT,`greater_than_int_max` BIGINT,`int_min` INT,`less_than_int_min` 
BIGINT,`really_big_decimal` DECIMAL(19, 0)}, primaryKeys=id, options=()}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT,`int_max` 
INT,`greater_than_int_max` BIGINT,`int_min` INT,`less_than_int_min` 
BIGINT,`really_big_decimal` DECIMAL(19, 0)}, primaryKeys=id, options=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[2, Barcarolle, 22, 2147483647, 2147483648, -2147483648, -2147483649, 
1234567890123456789], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[3, Cecily, 23, 2147483647, 2147483648, -2147483648, -2147483649, 
1234567890123456789], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[3, 
Cecily, 23, 2147483647, 2147483648, -2147483648, -2147483649, 
1234567890123456789], after=[3, Colin, 24, 2147483647, 2147483648, -2147483648, 
-2147483649, 1234567890123456789], op=UPDATE, meta=()}",
diff --git 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java
 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java
index b97b4b6de..5cf424cd3 100644
--- 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java
+++ 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineUdfITCase.java
@@ -165,7 +165,7 @@ public class FlinkPipelineUdfITCase {
         String[] outputEvents = outCaptor.toString().trim().split("\n");
         assertThat(outputEvents)
                 .containsExactly(
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.table1, 
schema=columns={`col1` STRING,`col2` STRING,`fmt` STRING}, primaryKeys=col1, 
options=({key1=value1})}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.table1, 
schema=columns={`col1` STRING NOT NULL,`col2` STRING,`fmt` STRING}, 
primaryKeys=col1, options=({key1=value1})}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], 
after=[1, 1, from 1 to z is lie], op=INSERT, meta=({op_ts=1})}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], 
after=[2, 2, from 2 to z is lie], op=INSERT, meta=({op_ts=2})}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], 
after=[3, 3, from 3 to z is lie], op=INSERT, meta=({op_ts=3})}",
@@ -236,7 +236,7 @@ public class FlinkPipelineUdfITCase {
         String[] outputEvents = outCaptor.toString().trim().split("\n");
         assertThat(outputEvents)
                 .containsExactly(
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.table1, 
schema=columns={`col1` STRING,`col2` STRING,`collen` STRING}, primaryKeys=col1, 
options=({key1=value1})}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.table1, 
schema=columns={`col1` STRING NOT NULL,`col2` STRING,`collen` STRING}, 
primaryKeys=col1, options=({key1=value1})}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], 
after=[2, 2, 3], op=INSERT, meta=({op_ts=2})}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], 
after=[3, 3, 4], op=INSERT, meta=({op_ts=3})}",
                         
"AddColumnEvent{tableId=default_namespace.default_schema.table1, 
addedColumns=[ColumnWithPosition{column=`col3` STRING, position=AFTER, 
existedColumnName=col2}]}",
@@ -305,7 +305,7 @@ public class FlinkPipelineUdfITCase {
         String[] outputEvents = outCaptor.toString().trim().split("\n");
         assertThat(outputEvents)
                 .containsExactly(
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.table1, 
schema=columns={`col1` STRING,`col2` STRING,`tob` STRING,`toi` STRING,`tof` 
STRING,`tos` STRING}, primaryKeys=col1, options=({key1=value1})}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.table1, 
schema=columns={`col1` STRING NOT NULL,`col2` STRING,`tob` STRING,`toi` 
STRING,`tof` STRING,`tos` STRING}, primaryKeys=col1, options=({key1=value1})}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], 
after=[1, 1, Boolean: true, Integer: 1, Double: 3.14, String: str], op=INSERT, 
meta=({op_ts=1})}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], 
after=[2, 2, Boolean: true, Integer: 1, Double: 3.14, String: str], op=INSERT, 
meta=({op_ts=2})}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], 
after=[3, 3, Boolean: true, Integer: 1, Double: 3.14, String: str], op=INSERT, 
meta=({op_ts=3})}",
@@ -377,7 +377,7 @@ public class FlinkPipelineUdfITCase {
         assertThat(outputEvents)
                 .contains("[ LifecycleFunction ] opened.")
                 .contains(
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.table1, 
schema=columns={`col1` STRING,`col2` STRING,`stt` STRING}, primaryKeys=col1, 
options=({key1=value1})}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.table1, 
schema=columns={`col1` STRING NOT NULL,`col2` STRING,`stt` STRING}, 
primaryKeys=col1, options=({key1=value1})}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], 
after=[1, 1, #0], op=INSERT, meta=({op_ts=1})}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], 
after=[2, 2, #1], op=INSERT, meta=({op_ts=2})}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], 
after=[3, 3, #2], op=INSERT, meta=({op_ts=3})}",
@@ -449,7 +449,7 @@ public class FlinkPipelineUdfITCase {
         String[] outputEvents = outCaptor.toString().trim().split("\n");
         assertThat(outputEvents)
                 .containsExactly(
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.table1, 
schema=columns={`col1` STRING,`col2` STRING,`ans` STRING}, primaryKeys=col1, 
options=({key1=value1})}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.table1, 
schema=columns={`col1` STRING NOT NULL,`col2` STRING,`ans` STRING}, 
primaryKeys=col1, options=({key1=value1})}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], 
after=[1, 1, Forty-two], op=INSERT, meta=({op_ts=1})}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], 
after=[2, 2, Forty-two], op=INSERT, meta=({op_ts=2})}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], 
after=[3, 3, Forty-two], op=INSERT, meta=({op_ts=3})}",
@@ -534,7 +534,7 @@ public class FlinkPipelineUdfITCase {
         assertThat(outputEvents)
                 .contains("[ LifecycleFunction ] opened.")
                 .contains(
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.table1, 
schema=columns={`col1` STRING,`col2` STRING,`inccol` STRING,`typ` STRING,`fmt` 
STRING,`stt` STRING}, primaryKeys=col1, options=({key1=value1})}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.table1, 
schema=columns={`col1` STRING NOT NULL,`col2` STRING,`inccol` STRING,`typ` 
STRING,`fmt` STRING,`stt` STRING}, primaryKeys=col1, options=({key1=value1})}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], 
after=[1, 1, 3, Integer: 42, 1-42, #0], op=INSERT, meta=({op_ts=1})}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], 
after=[2, 2, 4, Integer: 42, 2-42, #1], op=INSERT, meta=({op_ts=2})}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], 
after=[3, 3, 5, Integer: 42, 3-42, #2], op=INSERT, meta=({op_ts=3})}",
@@ -609,7 +609,7 @@ public class FlinkPipelineUdfITCase {
         String[] outputEvents = outCaptor.toString().trim().split("\n");
         assertThat(outputEvents)
                 .containsExactly(
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.table1, 
schema=columns={`col1` STRING,`col2` STRING,`fmt` STRING}, primaryKeys=col1, 
options=({key1=value1})}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.table1, 
schema=columns={`col1` STRING NOT NULL,`col2` STRING,`fmt` STRING}, 
primaryKeys=col1, options=({key1=value1})}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], 
after=[1, 1, from 1 to z is lie], op=INSERT, meta=({op_ts=1})}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], 
after=[2, 2, from 2 to z is lie], op=INSERT, meta=({op_ts=2})}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], 
after=[3, 3, from 3 to z is lie], op=INSERT, meta=({op_ts=3})}",
@@ -679,7 +679,7 @@ public class FlinkPipelineUdfITCase {
         String[] outputEvents = outCaptor.toString().trim().split("\n");
         assertThat(outputEvents)
                 .containsExactly(
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.table1, 
schema=columns={`col1` STRING,`col2` STRING,`collen` STRING}, primaryKeys=col1, 
options=({key1=value1})}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.table1, 
schema=columns={`col1` STRING NOT NULL,`col2` STRING,`collen` STRING}, 
primaryKeys=col1, options=({key1=value1})}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], 
after=[2, 2, 3], op=INSERT, meta=({op_ts=2})}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], 
after=[3, 3, 4], op=INSERT, meta=({op_ts=3})}",
                         
"AddColumnEvent{tableId=default_namespace.default_schema.table1, 
addedColumns=[ColumnWithPosition{column=`col3` STRING, position=AFTER, 
existedColumnName=col2}]}",
@@ -747,7 +747,7 @@ public class FlinkPipelineUdfITCase {
         String[] outputEvents = outCaptor.toString().trim().split("\n");
         assertThat(outputEvents)
                 .containsExactly(
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.table1, 
schema=columns={`col1` STRING,`col2` STRING,`tob` STRING,`toi` STRING,`tof` 
STRING,`tos` STRING}, primaryKeys=col1, options=({key1=value1})}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.table1, 
schema=columns={`col1` STRING NOT NULL,`col2` STRING,`tob` STRING,`toi` 
STRING,`tof` STRING,`tos` STRING}, primaryKeys=col1, options=({key1=value1})}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], 
after=[1, 1, Boolean: true, Integer: 1, Double: 3.14, String: str], op=INSERT, 
meta=({op_ts=1})}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], 
after=[2, 2, Boolean: true, Integer: 1, Double: 3.14, String: str], op=INSERT, 
meta=({op_ts=2})}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], 
after=[3, 3, Boolean: true, Integer: 1, Double: 3.14, String: str], op=INSERT, 
meta=({op_ts=3})}",
@@ -825,8 +825,8 @@ public class FlinkPipelineUdfITCase {
         // Check the order and content of all received events
         String[] outputEvents = outCaptor.toString().trim().split("\n");
         assertThat(outputEvents)
-                .contains(
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.table1, 
schema=columns={`col1` STRING,`col2` STRING,`inccol` STRING,`typ` STRING,`fmt` 
STRING}, primaryKeys=col1, options=({key1=value1})}",
+                .containsExactly(
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.table1, 
schema=columns={`col1` STRING NOT NULL,`col2` STRING,`inccol` STRING,`typ` 
STRING,`fmt` STRING}, primaryKeys=col1, options=({key1=value1})}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], 
after=[1, 1, 3, Integer: 42, 1-42], op=INSERT, meta=({op_ts=1})}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], 
after=[2, 2, 4, Integer: 42, 2-42], op=INSERT, meta=({op_ts=2})}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], 
after=[3, 3, 5, Integer: 42, 3-42], op=INSERT, meta=({op_ts=3})}",
@@ -902,7 +902,7 @@ public class FlinkPipelineUdfITCase {
         String[] outputEvents = outCaptor.toString().trim().split("\n");
         assertThat(outputEvents)
                 .contains(
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.table1, 
schema=columns={`col1` STRING,`col2` STRING,`emb` STRING}, primaryKeys=col1, 
options=({key1=value1})}")
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.table1, 
schema=columns={`col1` STRING NOT NULL,`col2` STRING,`emb` STRING}, 
primaryKeys=col1, options=({key1=value1})}")
                 // The result of transform by model is not fixed.
                 .hasSize(9);
     }
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
index d60721027..c565b5610 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
@@ -373,7 +373,7 @@ public class PostTransformOperator extends 
AbstractStreamOperator<Event>
                                     Tuple2.of(tableId, transformProjection));
                     // update the columns of projection and add the column of 
projection into Schema
                     newSchemas.add(
-                            postTransformProcessor.processSchemaChangeEvent(
+                            postTransformProcessor.processSchema(
                                     schema, 
transform.getSupportedMetadataColumns()));
                 }
             }
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformProjectionProcessor.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformProjectionProcessor.java
index abedd9120..dc8c099d6 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformProjectionProcessor.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformProjectionProcessor.java
@@ -30,9 +30,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
@@ -124,8 +126,7 @@ public class TransformProjectionProcessor {
                 supportedMetadataColumns);
     }
 
-    public Schema processSchemaChangeEvent(
-            Schema schema, SupportedMetadataColumn[] supportedMetadataColumns) 
{
+    public Schema processSchema(Schema schema, SupportedMetadataColumn[] 
supportedMetadataColumns) {
         List<ProjectionColumn> projectionColumns =
                 TransformParser.generateProjectionColumns(
                         transformProjection.getProjection(),
@@ -133,12 +134,22 @@ public class TransformProjectionProcessor {
                         udfDescriptors,
                         supportedMetadataColumns);
         transformProjection.setProjectionColumns(projectionColumns);
+        Set<String> primaryKeys = new HashSet<>(schema.primaryKeys());
         return schema.copy(
                 projectionColumns.stream()
                         .map(ProjectionColumn::getColumn)
+                        .map(column -> setPkNonNull(primaryKeys, column))
                         .collect(Collectors.toList()));
     }
 
+    private static Column setPkNonNull(Set<String> primaryKeys, Column column) 
{
+        if (primaryKeys.contains(column.getName())) {
+            return column.copy(column.getType().notNull());
+        } else {
+            return column;
+        }
+    }
+
     public BinaryRecordData processData(
             BinaryRecordData payload, long epochTime, String opType, 
Map<String, String> meta) {
         List<Object> valueList = new ArrayList<>();
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
index 4d8733845..a7dc61ac4 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
@@ -45,7 +45,7 @@ public class PostTransformOperatorTest {
             TableId.tableId("my_company", "my_branch", "customers");
     private static final Schema CUSTOMERS_SCHEMA =
             Schema.newBuilder()
-                    .physicalColumn("col1", DataTypes.STRING())
+                    .physicalColumn("col1", DataTypes.STRING().notNull())
                     .physicalColumn("col2", DataTypes.STRING())
                     .physicalColumn("col12", DataTypes.STRING())
                     .primaryKey("col1")
@@ -55,7 +55,7 @@ public class PostTransformOperatorTest {
             TableId.tableId("my_company", "my_branch", "data_types");
     private static final Schema DATATYPE_SCHEMA =
             Schema.newBuilder()
-                    .physicalColumn("colString", DataTypes.STRING())
+                    .physicalColumn("colString", DataTypes.STRING().notNull())
                     .physicalColumn("colBoolean", DataTypes.BOOLEAN())
                     .physicalColumn("colTinyint", DataTypes.TINYINT())
                     .physicalColumn("colSmallint", DataTypes.SMALLINT())
@@ -74,12 +74,12 @@ public class PostTransformOperatorTest {
             TableId.tableId("my_company", "my_branch", "metadata_table");
     private static final Schema METADATA_SCHEMA =
             Schema.newBuilder()
-                    .physicalColumn("col1", DataTypes.STRING())
+                    .physicalColumn("col1", DataTypes.STRING().notNull())
                     .primaryKey("col1")
                     .build();
     private static final Schema EXPECTED_METADATA_SCHEMA =
             Schema.newBuilder()
-                    .physicalColumn("col1", DataTypes.STRING())
+                    .physicalColumn("col1", DataTypes.STRING().notNull())
                     .physicalColumn("identifier_name", DataTypes.STRING())
                     .physicalColumn("__namespace_name__", 
DataTypes.STRING().notNull())
                     .physicalColumn("__schema_name__", 
DataTypes.STRING().notNull())
@@ -91,7 +91,7 @@ public class PostTransformOperatorTest {
             TableId.tableId("my_company", "my_branch", "metadata_as_table");
     private static final Schema METADATA_AS_SCHEMA =
             Schema.newBuilder()
-                    .physicalColumn("sid", DataTypes.INT())
+                    .physicalColumn("sid", DataTypes.INT().notNull())
                     .physicalColumn("name", DataTypes.STRING())
                     .physicalColumn("name_upper", DataTypes.STRING())
                     .physicalColumn("tbname", DataTypes.STRING().notNull())
@@ -102,7 +102,7 @@ public class PostTransformOperatorTest {
             TableId.tableId("my_company", "my_branch", "timestamp_table");
     private static final Schema TIMESTAMP_SCHEMA =
             Schema.newBuilder()
-                    .physicalColumn("col1", DataTypes.STRING())
+                    .physicalColumn("col1", DataTypes.STRING().notNull())
                     .physicalColumn("time_equal", DataTypes.INT())
                     .physicalColumn("timestamp_equal", DataTypes.INT())
                     .physicalColumn("date_equal", DataTypes.INT())
@@ -113,7 +113,7 @@ public class PostTransformOperatorTest {
             TableId.tableId("my_company", "my_branch", "timestampdiff_table");
     private static final Schema TIMESTAMPDIFF_SCHEMA =
             Schema.newBuilder()
-                    .physicalColumn("col1", DataTypes.STRING())
+                    .physicalColumn("col1", DataTypes.STRING().notNull())
                     .physicalColumn("second_diff", DataTypes.INT())
                     .physicalColumn("minute_diff", DataTypes.INT())
                     .physicalColumn("hour_diff", DataTypes.INT())
@@ -125,7 +125,7 @@ public class PostTransformOperatorTest {
             TableId.tableId("my_company", "my_branch", "data_null");
     private static final Schema NULL_SCHEMA =
             Schema.newBuilder()
-                    .physicalColumn("col1", DataTypes.STRING())
+                    .physicalColumn("col1", DataTypes.STRING().notNull())
                     .physicalColumn("colString", DataTypes.STRING())
                     .physicalColumn("nullInt", DataTypes.INT())
                     .physicalColumn("nullBoolean", DataTypes.BOOLEAN())
@@ -145,7 +145,7 @@ public class PostTransformOperatorTest {
             TableId.tableId("my_company", "my_branch", "data_cast");
     private static final Schema CAST_SCHEMA =
             Schema.newBuilder()
-                    .physicalColumn("col1", DataTypes.STRING())
+                    .physicalColumn("col1", DataTypes.STRING().notNull())
                     .physicalColumn("castInt", DataTypes.INT())
                     .physicalColumn("castBoolean", DataTypes.BOOLEAN())
                     .physicalColumn("castTinyint", DataTypes.TINYINT())
@@ -164,7 +164,7 @@ public class PostTransformOperatorTest {
             TableId.tableId("my_company", "my_branch", "timezone_table");
     private static final Schema TIMEZONE_SCHEMA =
             Schema.newBuilder()
-                    .physicalColumn("col1", DataTypes.STRING())
+                    .physicalColumn("col1", DataTypes.STRING().notNull())
                     .physicalColumn("datetime", DataTypes.STRING())
                     .primaryKey("col1")
                     .build();
@@ -173,7 +173,7 @@ public class PostTransformOperatorTest {
             TableId.tableId("my_company", "my_branch", "condition_table");
     private static final Schema CONDITION_SCHEMA =
             Schema.newBuilder()
-                    .physicalColumn("col1", DataTypes.STRING())
+                    .physicalColumn("col1", DataTypes.STRING().notNull())
                     .physicalColumn("condition_result", DataTypes.BOOLEAN())
                     .primaryKey("col1")
                     .build();
@@ -232,7 +232,7 @@ public class PostTransformOperatorTest {
             TableId.tableId("my_company", "my_branch", "column_square");
     private static final Schema COLUMN_SQUARE_SCHEMA =
             Schema.newBuilder()
-                    .physicalColumn("col1", DataTypes.INT())
+                    .physicalColumn("col1", DataTypes.INT().notNull())
                     .physicalColumn("col2", DataTypes.INT())
                     .physicalColumn("square_col2", DataTypes.INT())
                     .primaryKey("col1")
@@ -604,7 +604,7 @@ public class PostTransformOperatorTest {
                         RegularEventOperatorTestHarness.with(transform, 1);
         Schema expectedSchema =
                 Schema.newBuilder()
-                        .physicalColumn("col1", DataTypes.STRING())
+                        .physicalColumn("col1", DataTypes.STRING().notNull())
                         .physicalColumn("identifier_name", DataTypes.STRING())
                         .physicalColumn("__namespace_name__", 
DataTypes.STRING().notNull())
                         .physicalColumn("__schema_name__", 
DataTypes.STRING().notNull())
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformOperatorWithSchemaEvolveTest.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformOperatorWithSchemaEvolveTest.java
index ffc6da850..50521e2a5 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformOperatorWithSchemaEvolveTest.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformOperatorWithSchemaEvolveTest.java
@@ -245,7 +245,7 @@ public class TransformOperatorWithSchemaEvolveTest {
                                 .primaryKey("id")
                                 .build(),
                         Schema.newBuilder()
-                                .physicalColumn("id", DataTypes.INT())
+                                .physicalColumn("id", 
DataTypes.INT().notNull())
                                 .physicalColumn("age", DataTypes.INT())
                                 .physicalColumn("computed", DataTypes.INT())
                                 .primaryKey("id")
@@ -302,7 +302,7 @@ public class TransformOperatorWithSchemaEvolveTest {
                                 .primaryKey("id")
                                 .build(),
                         Schema.newBuilder()
-                                .physicalColumn("id", DataTypes.INT())
+                                .physicalColumn("id", 
DataTypes.INT().notNull())
                                 .physicalColumn("age", DataTypes.INT())
                                 .physicalColumn("computed", DataTypes.INT())
                                 .primaryKey("id")
@@ -362,7 +362,7 @@ public class TransformOperatorWithSchemaEvolveTest {
                                 .primaryKey("id")
                                 .build(),
                         Schema.newBuilder()
-                                .physicalColumn("id", DataTypes.INT())
+                                .physicalColumn("id", 
DataTypes.INT().notNull())
                                 .physicalColumn("age", DataTypes.INT())
                                 .physicalColumn("name", DataTypes.STRING())
                                 .physicalColumn("computed", DataTypes.INT())
@@ -412,7 +412,7 @@ public class TransformOperatorWithSchemaEvolveTest {
                                 .primaryKey("id")
                                 .build(),
                         Schema.newBuilder()
-                                .physicalColumn("id", DataTypes.INT())
+                                .physicalColumn("id", 
DataTypes.INT().notNull())
                                 .physicalColumn("name", DataTypes.STRING())
                                 .physicalColumn("age", DataTypes.INT())
                                 .physicalColumn("computed", DataTypes.INT())
@@ -547,7 +547,7 @@ public class TransformOperatorWithSchemaEvolveTest {
                                 .build(),
                         Schema.newBuilder()
                                 .physicalColumn("computed1", DataTypes.INT())
-                                .physicalColumn("id", DataTypes.INT())
+                                .physicalColumn("id", 
DataTypes.INT().notNull())
                                 .physicalColumn("name", DataTypes.STRING())
                                 .physicalColumn("age", DataTypes.INT())
                                 .physicalColumn("computed2", DataTypes.INT())
@@ -682,7 +682,7 @@ public class TransformOperatorWithSchemaEvolveTest {
                                 .build(),
                         Schema.newBuilder()
                                 .physicalColumn("computed", DataTypes.INT())
-                                .physicalColumn("id", DataTypes.INT())
+                                .physicalColumn("id", 
DataTypes.INT().notNull())
                                 .physicalColumn("name", DataTypes.STRING())
                                 .physicalColumn("age", DataTypes.INT())
                                 .primaryKey("id")
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/UnifiedTransformOperatorTest.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/UnifiedTransformOperatorTest.java
index 7a0482706..59abfa06a 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/UnifiedTransformOperatorTest.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/UnifiedTransformOperatorTest.java
@@ -350,7 +350,7 @@ public class UnifiedTransformOperatorTest {
                                 .primaryKey("id")
                                 .build(),
                         Schema.newBuilder()
-                                .physicalColumn("id", DataTypes.INT())
+                                .physicalColumn("id", 
DataTypes.INT().notNull())
                                 .physicalColumn("age", DataTypes.INT())
                                 .physicalColumn("computed", DataTypes.INT())
                                 .primaryKey("id")
@@ -396,7 +396,7 @@ public class UnifiedTransformOperatorTest {
                                 .primaryKey("id")
                                 .build(),
                         Schema.newBuilder()
-                                .physicalColumn("id", DataTypes.INT())
+                                .physicalColumn("id", 
DataTypes.INT().notNull())
                                 .physicalColumn("name", 
DataTypes.STRING().notNull())
                                 .physicalColumn("age", 
DataTypes.INT().notNull())
                                 .physicalColumn("computed", DataTypes.INT())
@@ -515,7 +515,7 @@ public class UnifiedTransformOperatorTest {
                                 .primaryKey("id")
                                 .build(),
                         Schema.newBuilder()
-                                .physicalColumn("id", DataTypes.INT())
+                                .physicalColumn("id", 
DataTypes.INT().notNull())
                                 .physicalColumn("name", DataTypes.STRING())
                                 .physicalColumn("age", DataTypes.INT())
                                 .physicalColumn("computed", DataTypes.INT())
@@ -562,7 +562,7 @@ public class UnifiedTransformOperatorTest {
                                 .build(),
                         Schema.newBuilder()
                                 .physicalColumn("computed", DataTypes.INT())
-                                .physicalColumn("id", DataTypes.INT())
+                                .physicalColumn("id", 
DataTypes.INT().notNull())
                                 .physicalColumn("name", DataTypes.STRING())
                                 .physicalColumn("age", DataTypes.INT())
                                 .primaryKey("id")
@@ -611,7 +611,7 @@ public class UnifiedTransformOperatorTest {
                                 .primaryKey("id")
                                 .build(),
                         Schema.newBuilder()
-                                .physicalColumn("id", DataTypes.INT())
+                                .physicalColumn("id", 
DataTypes.INT().notNull())
                                 .physicalColumn("name", DataTypes.STRING())
                                 .physicalColumn("age", DataTypes.INT())
                                 .physicalColumn("__namespace_name__", 
DataTypes.STRING().notNull())
@@ -665,7 +665,7 @@ public class UnifiedTransformOperatorTest {
                                 .primaryKey("id")
                                 .build(),
                         Schema.newBuilder()
-                                .physicalColumn("id", DataTypes.INT())
+                                .physicalColumn("id", 
DataTypes.INT().notNull())
                                 .physicalColumn("name", DataTypes.STRING())
                                 .physicalColumn("age", DataTypes.INT())
                                 .physicalColumn("identifier_name", 
DataTypes.STRING())
@@ -714,7 +714,7 @@ public class UnifiedTransformOperatorTest {
                                 .build(),
                         Schema.newBuilder()
                                 .physicalColumn("identifier_name", 
DataTypes.STRING())
-                                .physicalColumn("id", DataTypes.INT())
+                                .physicalColumn("id", 
DataTypes.INT().notNull())
                                 .physicalColumn("name", DataTypes.STRING())
                                 .physicalColumn("age", DataTypes.INT())
                                 .primaryKey("id")
@@ -765,7 +765,7 @@ public class UnifiedTransformOperatorTest {
                                 .primaryKey("id")
                                 .build(),
                         Schema.newBuilder()
-                                .physicalColumn("id", DataTypes.INT())
+                                .physicalColumn("id", 
DataTypes.INT().notNull())
                                 .physicalColumn("name", DataTypes.STRING())
                                 .physicalColumn("age", DataTypes.INT())
                                 .physicalColumn("identifier_name", 
DataTypes.STRING())
@@ -855,7 +855,7 @@ public class UnifiedTransformOperatorTest {
                                 .physicalColumn("__namespace_name__", 
DataTypes.STRING().notNull())
                                 .physicalColumn("__schema_name__", 
DataTypes.STRING().notNull())
                                 .physicalColumn("__table_name__", 
DataTypes.STRING().notNull())
-                                .physicalColumn("id", DataTypes.INT())
+                                .physicalColumn("id", 
DataTypes.INT().notNull())
                                 .physicalColumn("name", DataTypes.STRING())
                                 .physicalColumn("age", DataTypes.INT())
                                 .primaryKey("id")
@@ -942,7 +942,7 @@ public class UnifiedTransformOperatorTest {
                                 .primaryKey("id")
                                 .build(),
                         Schema.newBuilder()
-                                .physicalColumn("id", DataTypes.INT())
+                                .physicalColumn("id", 
DataTypes.INT().notNull())
                                 .physicalColumn("name", 
DataTypes.STRING().notNull())
                                 .physicalColumn("age", 
DataTypes.INT().notNull())
                                 .physicalColumn("computed", DataTypes.INT())
@@ -1065,7 +1065,10 @@ public class UnifiedTransformOperatorTest {
                                 .build(),
                         Schema.newBuilder()
                                 .physicalColumn(
-                                        "id", DataTypes.INT(), "id column", 
"AUTO_INCREMENT()")
+                                        "id",
+                                        DataTypes.INT().notNull(),
+                                        "id column",
+                                        "AUTO_INCREMENT()")
                                 .physicalColumn(
                                         "name", DataTypes.STRING(), "name 
column", "John Smith")
                                 .physicalColumn("age", DataTypes.INT(), "age 
column", "17")
@@ -1098,7 +1101,10 @@ public class UnifiedTransformOperatorTest {
                                 .build(),
                         Schema.newBuilder()
                                 .physicalColumn(
-                                        "id", DataTypes.INT(), "id column", 
"AUTO_INCREMENT()")
+                                        "id",
+                                        DataTypes.INT().notNull(),
+                                        "id column",
+                                        "AUTO_INCREMENT()")
                                 .physicalColumn("age", DataTypes.INT(), "age 
column", "17")
                                 .physicalColumn("computed", DataTypes.INT())
                                 .primaryKey("id")
@@ -1145,7 +1151,10 @@ public class UnifiedTransformOperatorTest {
                                 .physicalColumn(
                                         "name", DataTypes.STRING(), "name 
column", "John Smith")
                                 .physicalColumn(
-                                        "id", DataTypes.INT(), "id column", 
"AUTO_INCREMENT()")
+                                        "id",
+                                        DataTypes.INT().notNull(),
+                                        "id column",
+                                        "AUTO_INCREMENT()")
                                 .primaryKey("id")
                                 .build())
                 .initializeHarness()

Reply via email to