This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new e43035421 [INLONG-4030][Sort] Import all changelog mode data ingest 
into Hive (#4065)
e43035421 is described below

commit e43035421ba98863b0864fb041101047465b61b8
Author: thexia <[email protected]>
AuthorDate: Fri May 6 18:50:31 2022 +0800

    [INLONG-4030][Sort] Import all changelog mode data ingest into Hive (#4065)
---
 .../sort/singletenant/flink/connectors/hive/HiveOptions.java       | 5 +++++
 .../sort/singletenant/flink/connectors/hive/HiveTableSink.java     | 7 +++++++
 2 files changed, 12 insertions(+)

diff --git 
a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/connectors/hive/HiveOptions.java
 
b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/connectors/hive/HiveOptions.java
index 14d17a5af..167b56895 100644
--- 
a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/connectors/hive/HiveOptions.java
+++ 
b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/connectors/hive/HiveOptions.java
@@ -62,4 +62,9 @@ public class HiveOptions {
                                     + "If it is true, using hadoop mapred 
record writer to write "
                                     + "parquet and orc files.");
 
+    public static final ConfigOption<Boolean> HIVE_IGNORE_ALL_CHANGELOG =
+            ConfigOptions.key("sink.ignore.changelog")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Regard upsert delete as insert kind.");
 }
diff --git 
a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/connectors/hive/HiveTableSink.java
 
b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/connectors/hive/HiveTableSink.java
index 1d5100bc0..14f2d4a71 100644
--- 
a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/connectors/hive/HiveTableSink.java
+++ 
b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/connectors/hive/HiveTableSink.java
@@ -22,6 +22,7 @@ import static 
org.apache.flink.table.filesystem.FileSystemOptions.SINK_ROLLING_P
 import static 
org.apache.flink.table.filesystem.FileSystemOptions.SINK_ROLLING_POLICY_FILE_SIZE;
 import static 
org.apache.flink.table.filesystem.FileSystemOptions.SINK_ROLLING_POLICY_ROLLOVER_INTERVAL;
 import static 
org.apache.flink.table.filesystem.stream.compact.CompactOperator.convertToUncompacted;
+import static 
org.apache.inlong.sort.singletenant.flink.connectors.hive.HiveOptions.HIVE_IGNORE_ALL_CHANGELOG;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
@@ -459,6 +460,12 @@ public class HiveTableSink implements DynamicTableSink, 
SupportsPartitioning, Su
 
     @Override
     public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+        if 
(org.apache.flink.configuration.Configuration.fromMap(catalogTable.getOptions())
+                .get(HIVE_IGNORE_ALL_CHANGELOG)) {
+            LOG.warn("Hive sink receive all changelog record. "
+                    + "Regard any other record as insert-only record.");
+            return ChangelogMode.all();
+        }
         return ChangelogMode.insertOnly();
     }
 

Reply via email to