This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch 2.1.4-prepare
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/2.1.4-prepare by this push:
     new 5ebd7563f [Improve][Connector-V1][Spark-Hbase] Handling null values 
(#3426)
5ebd7563f is described below

commit 5ebd7563fe8cf105b523d22076eea0f7ffcb592e
Author: Kirs <[email protected]>
AuthorDate: Mon Nov 14 13:55:54 2022 +0800

    [Improve][Connector-V1][Spark-Hbase] Handling null values (#3426)
    
    * Nullable write
    
    * Nullable write
    
    Co-authored-by: zhouyao <[email protected]>
    
    Co-authored-by: Carl-Zhou-CN 
<[email protected]>
    Co-authored-by: zhouyao <[email protected]>
---
 docs/en/connector/sink/Hbase.md                                  | 7 ++++++-
 .../src/main/scala/org/apache/seatunnel/spark/hbase/Config.scala | 5 +++++
 .../main/scala/org/apache/seatunnel/spark/hbase/sink/Hbase.scala | 9 +++++++--
 3 files changed, 18 insertions(+), 3 deletions(-)

diff --git a/docs/en/connector/sink/Hbase.md b/docs/en/connector/sink/Hbase.md
index 0c6e8336a..17fdef230 100644
--- a/docs/en/connector/sink/Hbase.md
+++ b/docs/en/connector/sink/Hbase.md
@@ -16,11 +16,12 @@ Engine Supported and plugin name
 ## Options
 
 | name                   | type   | required | default value |
-| ---------------------- | ------ | -------- | ------------- |
+|------------------------|--------| -------- |---------------|
 | hbase.zookeeper.quorum | string | yes      |               |
 | catalog                | string | yes      |               |
 | staging_dir            | string | yes      |               |
 | save_mode              | string | no       | append        |
+| nullable               | bool   | no       | false         |
 | hbase.*                | string | no       |               |
 
 ### hbase.zookeeper.quorum [string]
@@ -41,6 +42,10 @@ Two write modes are supported, `overwrite` and `append` . 
`overwrite` means that
 
 `append` means that the original data of the `hbase table` will not be 
cleared, and the load operation will be performed directly.
 
+### nullable [bool]
+
+Whether the null value is written to hbase
+
 ### hbase.* [string]
 
 Users can also specify multiple optional parameters. For a detailed list of 
parameters, see [Hbase Supported 
Parameters](https://hbase.apache.org/book.html#config.files).
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hbase/src/main/scala/org/apache/seatunnel/spark/hbase/Config.scala
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hbase/src/main/scala/org/apache/seatunnel/spark/hbase/Config.scala
index 493aba7e8..ba97bbe19 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hbase/src/main/scala/org/apache/seatunnel/spark/hbase/Config.scala
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hbase/src/main/scala/org/apache/seatunnel/spark/hbase/Config.scala
@@ -42,4 +42,9 @@ object Config extends Serializable {
    */
   val HBASE_ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum"
 
+  /**
+   * nullable
+   */
+  val NULLABLE = "nullable"
+
 }
diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hbase/src/main/scala/org/apache/seatunnel/spark/hbase/sink/Hbase.scala
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hbase/src/main/scala/org/apache/seatunnel/spark/hbase/sink/Hbase.scala
index 0e5c6d123..cfc166c98 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hbase/src/main/scala/org/apache/seatunnel/spark/hbase/sink/Hbase.scala
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hbase/src/main/scala/org/apache/seatunnel/spark/hbase/sink/Hbase.scala
@@ -30,7 +30,7 @@ import org.apache.hadoop.hbase.util.Bytes
 import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
 import org.apache.seatunnel.common.config.CheckResult
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory
-import org.apache.seatunnel.spark.hbase.Config.{CATALOG, 
HBASE_ZOOKEEPER_QUORUM, SAVE_MODE, STAGING_DIR}
+import org.apache.seatunnel.spark.hbase.Config.{CATALOG, 
HBASE_ZOOKEEPER_QUORUM, NULLABLE, SAVE_MODE, STAGING_DIR}
 import org.apache.seatunnel.spark.SparkEnvironment
 import org.apache.seatunnel.spark.batch.SparkBatchSink
 import org.apache.spark.internal.Logging
@@ -52,7 +52,8 @@ class Hbase extends SparkBatchSink with Logging {
   override def prepare(env: SparkEnvironment): Unit = {
     val defaultConfig = ConfigFactory.parseMap(
       Map(
-        SAVE_MODE -> HbaseSaveMode.Append.toString.toLowerCase))
+        SAVE_MODE -> HbaseSaveMode.Append.toString.toLowerCase,
+        NULLABLE -> false))
 
     config = config.withFallback(defaultConfig)
     hbaseConf = 
HBaseConfiguration.create(env.getSparkSession.sessionState.newHadoopConf())
@@ -73,6 +74,7 @@ class Hbase extends SparkBatchSink with Logging {
     val colNames = df.columns
     val catalog = config.getString(CATALOG)
     val stagingDir = config.getString(STAGING_DIR) + "/" + 
System.currentTimeMillis().toString
+    val nullable = config.getBoolean(NULLABLE)
 
     // convert all columns type to string
     for (colName <- colNames) {
@@ -118,6 +120,9 @@ class Hbase extends SparkBatchSink with Logging {
               val qualifier = c._2
               val value = r.getAs[String](c._3)
               if (value == null) {
+                if (nullable) {
+                  familyQualifiersValues += (family, qualifier, null)
+                }
                 break
               }
               familyQualifiersValues += (family, qualifier, 
Bytes.toBytes(value))

Reply via email to