This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new e43035421 [INLONG-4030][Sort] Import all changelog mode data ingest
into Hive (#4065)
e43035421 is described below
commit e43035421ba98863b0864fb041101047465b61b8
Author: thexia <[email protected]>
AuthorDate: Fri May 6 18:50:31 2022 +0800
[INLONG-4030][Sort] Import all changelog mode data ingest into Hive (#4065)
---
.../sort/singletenant/flink/connectors/hive/HiveOptions.java | 5 +++++
.../sort/singletenant/flink/connectors/hive/HiveTableSink.java | 7 +++++++
2 files changed, 12 insertions(+)
diff --git
a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/connectors/hive/HiveOptions.java
b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/connectors/hive/HiveOptions.java
index 14d17a5af..167b56895 100644
---
a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/connectors/hive/HiveOptions.java
+++
b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/connectors/hive/HiveOptions.java
@@ -62,4 +62,9 @@ public class HiveOptions {
+ "If it is true, using hadoop mapred
record writer to write "
+ "parquet and orc files.");
+ public static final ConfigOption<Boolean> HIVE_IGNORE_ALL_CHANGELOG =
+ ConfigOptions.key("sink.ignore.changelog")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Regard upsert delete as insert kind.");
}
diff --git
a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/connectors/hive/HiveTableSink.java
b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/connectors/hive/HiveTableSink.java
index 1d5100bc0..14f2d4a71 100644
---
a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/connectors/hive/HiveTableSink.java
+++
b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/connectors/hive/HiveTableSink.java
@@ -22,6 +22,7 @@ import static
org.apache.flink.table.filesystem.FileSystemOptions.SINK_ROLLING_P
import static
org.apache.flink.table.filesystem.FileSystemOptions.SINK_ROLLING_POLICY_FILE_SIZE;
import static
org.apache.flink.table.filesystem.FileSystemOptions.SINK_ROLLING_POLICY_ROLLOVER_INTERVAL;
import static
org.apache.flink.table.filesystem.stream.compact.CompactOperator.convertToUncompacted;
+import static
org.apache.inlong.sort.singletenant.flink.connectors.hive.HiveOptions.HIVE_IGNORE_ALL_CHANGELOG;
import java.io.IOException;
import java.io.UncheckedIOException;
@@ -459,6 +460,12 @@ public class HiveTableSink implements DynamicTableSink,
SupportsPartitioning, Su
@Override
public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+ if
(org.apache.flink.configuration.Configuration.fromMap(catalogTable.getOptions())
+ .get(HIVE_IGNORE_ALL_CHANGELOG)) {
+ LOG.warn("Hive sink receive all changelog record. "
+ + "Regard any other record as insert-only record.");
+ return ChangelogMode.all();
+ }
return ChangelogMode.insertOnly();
}