This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new c5cb59371 [INLONG-8116][Sort] Support table api config setting (#8117)
c5cb59371 is described below
commit c5cb5937188e1e6ab5b780b3074e0d9b0edeb98f
Author: Xin Gong <[email protected]>
AuthorDate: Wed May 31 12:23:16 2023 +0800
[INLONG-8116][Sort] Support table api config setting (#8117)
---
.../inlong/sort/configuration/Constants.java | 24 ++++++++++++++++++++++
.../main/java/org/apache/inlong/sort/Entrance.java | 4 ++++
2 files changed, 28 insertions(+)
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
index 209443602..702dbef53 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
@@ -97,6 +97,30 @@ public class Constants {
*/
public static final String PIPELINE_NAME = "pipeline.name";
+ /**
+ *
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/config/
+ * default AUTO
+ */
+ public static final String TABLE_EXEC_SINK_UPSERT_MATERIALIZE =
"table.exec.sink.upsert-materialize";
+
+ public static final ConfigOption<String> UPSERT_MATERIALIZE =
+
key("table.exec.sink.upsert-materialize").defaultValue("AUTO").withDescription("Because
of the disorder "
+ + "of ChangeLog data caused by Shuffle in distributed
system, the data received by Sink may not "
+ + "be the order of global upsert. So add upsert
materialize operator before upsert sink. It "
+ + "receives the upstream changelog records and generate an
upsert view for the downstream.\n"
+ + "By default, the materialize operator will be added when
a distributed disorder occurs on "
+ + "unique keys. You can also choose no
materialization(NONE) or force materialization(FORCE).");
+
+ public static final String TABLE_EXEC_SINK_NOT_NULL_ENFORCER =
"table.exec.sink.not-null-enforcer";
+
+ public static final ConfigOption<String> NOT_NULL_ENFORCER =
+
key("table.exec.sink.not-null-enforcer").defaultValue("ERROR").withDescription("Determines
how Flink "
+ + "enforces NOT NULL column constraints when inserting
null values.\n"
+ + "Possible values:\n"
+ + "\"ERROR\": Throw a runtime exception when writing null
values into NOT NULL column.\n"
+ + "\"DROP\": Drop records silently if a null value would
have to be inserted into a NOT NULL "
+ + "column.");
+
/**
* The ID of the cluster, used to separate multiple clusters.
*/
diff --git
a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java
b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java
index f5e02d73c..ae3ebc842 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java
@@ -55,6 +55,10 @@ public class Entrance {
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
settings);
tableEnv.getConfig().getConfiguration().setString(Constants.PIPELINE_NAME,
config.getString(Constants.JOB_NAME));
+
tableEnv.getConfig().getConfiguration().setString(Constants.TABLE_EXEC_SINK_UPSERT_MATERIALIZE,
+ config.getString(Constants.UPSERT_MATERIALIZE));
+
tableEnv.getConfig().getConfiguration().setString(Constants.TABLE_EXEC_SINK_NOT_NULL_ENFORCER,
+ config.getString(Constants.NOT_NULL_ENFORCER));
String sqlFile = config.getString(Constants.SQL_SCRIPT_FILE);
Parser parser;
if (StringUtils.isEmpty(sqlFile)) {