This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new c5cb59371 [INLONG-8116][Sort] Support table api config setting (#8117)
c5cb59371 is described below

commit c5cb5937188e1e6ab5b780b3074e0d9b0edeb98f
Author: Xin Gong <[email protected]>
AuthorDate: Wed May 31 12:23:16 2023 +0800

    [INLONG-8116][Sort] Support table api config setting (#8117)
---
 .../inlong/sort/configuration/Constants.java       | 24 ++++++++++++++++++++++
 .../main/java/org/apache/inlong/sort/Entrance.java |  4 ++++
 2 files changed, 28 insertions(+)

diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
index 209443602..702dbef53 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
@@ -97,6 +97,30 @@ public class Constants {
      */
     public static final String PIPELINE_NAME = "pipeline.name";
 
+    /**
+     * 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/config/
+     * default AUTO
+     */
+    public static final String TABLE_EXEC_SINK_UPSERT_MATERIALIZE = 
"table.exec.sink.upsert-materialize";
+
+    public static final ConfigOption<String> UPSERT_MATERIALIZE =
+            
key("table.exec.sink.upsert-materialize").defaultValue("AUTO").withDescription("Because
 of the disorder "
+                    + "of ChangeLog data caused by Shuffle in distributed 
system, the data received by Sink may not "
+                    + "be the order of global upsert. So add upsert 
materialize operator before upsert sink. It "
+                    + "receives the upstream changelog records and generate an 
upsert view for the downstream.\n"
+                    + "By default, the materialize operator will be added when 
a distributed disorder occurs on "
+                    + "unique keys. You can also choose no 
materialization(NONE) or force materialization(FORCE).");
+
+    public static final String TABLE_EXEC_SINK_NOT_NULL_ENFORCER = 
"table.exec.sink.not-null-enforcer";
+
+    public static final ConfigOption<String> NOT_NULL_ENFORCER =
+            
key("table.exec.sink.not-null-enforcer").defaultValue("ERROR").withDescription("Determines
 how Flink "
+                    + "enforces NOT NULL column constraints when inserting 
null values.\n"
+                    + "Possible values:\n"
+                    + "\"ERROR\": Throw a runtime exception when writing null 
values into NOT NULL column.\n"
+                    + "\"DROP\": Drop records silently if a null value would 
have to be inserted into a NOT NULL "
+                    + "column.");
+
     /**
      * The ID of the cluster, used to separate multiple clusters.
      */
diff --git 
a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java 
b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java
index f5e02d73c..ae3ebc842 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java
@@ -55,6 +55,10 @@ public class Entrance {
         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);
         
tableEnv.getConfig().getConfiguration().setString(Constants.PIPELINE_NAME,
                 config.getString(Constants.JOB_NAME));
+        
tableEnv.getConfig().getConfiguration().setString(Constants.TABLE_EXEC_SINK_UPSERT_MATERIALIZE,
+                config.getString(Constants.UPSERT_MATERIALIZE));
+        
tableEnv.getConfig().getConfiguration().setString(Constants.TABLE_EXEC_SINK_NOT_NULL_ENFORCER,
+                config.getString(Constants.NOT_NULL_ENFORCER));
         String sqlFile = config.getString(Constants.SQL_SCRIPT_FILE);
         Parser parser;
         if (StringUtils.isEmpty(sqlFile)) {

Reply via email to