This is an automated email from the ASF dual-hosted git repository.

fcsaky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kudu.git


The following commit(s) were added to refs/heads/main by this push:
     new 9705f06  [FLINK-37190] Make Kudu `FlushMode` configurable in Flink SQL
9705f06 is described below

commit 9705f063b5bfe99076e763727a2e0a77be33a5ad
Author: Ferenc Csaky <fcs...@apache.org>
AuthorDate: Tue Jan 21 13:47:48 2025 +0100

    [FLINK-37190] Make Kudu `FlushMode` configurable in Flink SQL
---
 .../flink/connector/kudu/table/KuduDynamicTableFactory.java       | 3 +++
 .../flink/connector/kudu/table/KuduDynamicTableOptions.java       | 8 ++++++++
 .../flink/connector/kudu/table/KuduDynamicTableFactoryTest.java   | 3 +++
 3 files changed, 14 insertions(+)

diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/connector/kudu/table/KuduDynamicTableFactory.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/connector/kudu/table/KuduDynamicTableFactory.java
index a03cc2c..efd238a 100644
--- 
a/flink-connector-kudu/src/main/java/org/apache/flink/connector/kudu/table/KuduDynamicTableFactory.java
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/connector/kudu/table/KuduDynamicTableFactory.java
@@ -42,6 +42,7 @@ import java.util.Set;
 import static 
org.apache.flink.connector.kudu.table.KuduCommonOptions.KUDU_MASTERS;
 import static 
org.apache.flink.connector.kudu.table.KuduDynamicTableOptions.IDENTIFIER;
 import static 
org.apache.flink.connector.kudu.table.KuduDynamicTableOptions.KUDU_FLUSH_INTERVAL;
+import static 
org.apache.flink.connector.kudu.table.KuduDynamicTableOptions.KUDU_FLUSH_MODE;
 import static 
org.apache.flink.connector.kudu.table.KuduDynamicTableOptions.KUDU_HASH_COLS;
 import static 
org.apache.flink.connector.kudu.table.KuduDynamicTableOptions.KUDU_HASH_PARTITION_NUMS;
 import static 
org.apache.flink.connector.kudu.table.KuduDynamicTableOptions.KUDU_IGNORE_DUPLICATE;
@@ -81,6 +82,7 @@ public class KuduDynamicTableFactory implements 
DynamicTableSourceFactory, Dynam
                 KUDU_MAX_BUFFER_SIZE,
                 KUDU_MAX_BUFFER_SIZE,
                 KUDU_OPERATION_TIMEOUT,
+                KUDU_FLUSH_MODE,
                 KUDU_FLUSH_INTERVAL,
                 KUDU_IGNORE_NOT_FOUND,
                 KUDU_IGNORE_DUPLICATE,
@@ -107,6 +109,7 @@ public class KuduDynamicTableFactory implements 
DynamicTableSourceFactory, Dynam
         final KuduWriterConfig.Builder configBuilder =
                 KuduWriterConfig.Builder.setMasters(config.get(KUDU_MASTERS))
                         
.setOperationTimeout(config.get(KUDU_OPERATION_TIMEOUT).toMillis())
+                        .setConsistency(config.get(KUDU_FLUSH_MODE))
                         .setFlushInterval((int) 
config.get(KUDU_FLUSH_INTERVAL).toMillis())
                         .setMaxBufferSize(config.get(KUDU_MAX_BUFFER_SIZE))
                         .setIgnoreNotFound(config.get(KUDU_IGNORE_NOT_FOUND))
diff --git 
a/flink-connector-kudu/src/main/java/org/apache/flink/connector/kudu/table/KuduDynamicTableOptions.java
 
b/flink-connector-kudu/src/main/java/org/apache/flink/connector/kudu/table/KuduDynamicTableOptions.java
index 9b4f11b..11de5de 100644
--- 
a/flink-connector-kudu/src/main/java/org/apache/flink/connector/kudu/table/KuduDynamicTableOptions.java
+++ 
b/flink-connector-kudu/src/main/java/org/apache/flink/connector/kudu/table/KuduDynamicTableOptions.java
@@ -21,6 +21,8 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 
+import org.apache.kudu.client.SessionConfiguration;
+
 import java.time.Duration;
 
 /** Kudu table options. */
@@ -53,6 +55,12 @@ public class KuduDynamicTableOptions {
                     .defaultValue(1000)
                     .withDescription("kudu's max buffer size");
 
+    public static final ConfigOption<SessionConfiguration.FlushMode> 
KUDU_FLUSH_MODE =
+            ConfigOptions.key("kudu.flush-mode")
+                    .enumType(SessionConfiguration.FlushMode.class)
+                    
.defaultValue(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND)
+                    .withDescription("kudu's data flush mode");
+
     public static final ConfigOption<Duration> KUDU_FLUSH_INTERVAL =
             ConfigOptions.key("kudu.flush-interval")
                     .durationType()
diff --git 
a/flink-connector-kudu/src/test/java/org/apache/flink/connector/kudu/table/KuduDynamicTableFactoryTest.java
 
b/flink-connector-kudu/src/test/java/org/apache/flink/connector/kudu/table/KuduDynamicTableFactoryTest.java
index 46abe1a..cadf2ca 100644
--- 
a/flink-connector-kudu/src/test/java/org/apache/flink/connector/kudu/table/KuduDynamicTableFactoryTest.java
+++ 
b/flink-connector-kudu/src/test/java/org/apache/flink/connector/kudu/table/KuduDynamicTableFactoryTest.java
@@ -39,6 +39,7 @@ import org.apache.kudu.Type;
 import org.apache.kudu.client.KuduScanner;
 import org.apache.kudu.client.KuduTable;
 import org.apache.kudu.client.RowResult;
+import org.apache.kudu.client.SessionConfiguration;
 import org.apache.kudu.shaded.com.google.common.collect.Lists;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -207,11 +208,13 @@ public class KuduDynamicTableFactoryTest extends 
KuduTestBase {
         properties.put("kudu.table", "TestTable12");
         properties.put("kudu.ignore-not-found", "true");
         properties.put("kudu.ignore-duplicate", "true");
+        properties.put("kudu.flush-mode", "auto_flush_sync");
         properties.put("kudu.flush-interval", "10000");
         properties.put("kudu.max-buffer-size", "10000");
 
         KuduWriterConfig.Builder builder =
                 KuduWriterConfig.Builder.setMasters(kuduMasters)
+                        
.setConsistency(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC)
                         .setFlushInterval(10000)
                         .setMaxBufferSize(10000)
                         .setIgnoreDuplicate(true)

Reply via email to