This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch 2.1.4-prepare
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/2.1.4-prepare by this push:
new 5ebd7563f [Improve][Connector-V1][Spark-Hbase] Handling null values
(#3426)
5ebd7563f is described below
commit 5ebd7563fe8cf105b523d22076eea0f7ffcb592e
Author: Kirs <[email protected]>
AuthorDate: Mon Nov 14 13:55:54 2022 +0800
[Improve][Connector-V1][Spark-Hbase] Handling null values (#3426)
* Nullable write
* Nullable write
Co-authored-by: zhouyao <[email protected]>
Co-authored-by: Carl-Zhou-CN
<[email protected]>
Co-authored-by: zhouyao <[email protected]>
---
docs/en/connector/sink/Hbase.md | 7 ++++++-
.../src/main/scala/org/apache/seatunnel/spark/hbase/Config.scala | 5 +++++
.../main/scala/org/apache/seatunnel/spark/hbase/sink/Hbase.scala | 9 +++++++--
3 files changed, 18 insertions(+), 3 deletions(-)
diff --git a/docs/en/connector/sink/Hbase.md b/docs/en/connector/sink/Hbase.md
index 0c6e8336a..17fdef230 100644
--- a/docs/en/connector/sink/Hbase.md
+++ b/docs/en/connector/sink/Hbase.md
@@ -16,11 +16,12 @@ Engine Supported and plugin name
## Options
| name | type | required | default value |
-| ---------------------- | ------ | -------- | ------------- |
+|------------------------|--------| -------- |---------------|
| hbase.zookeeper.quorum | string | yes | |
| catalog | string | yes | |
| staging_dir | string | yes | |
| save_mode | string | no | append |
+| nullable | bool | no | false |
| hbase.* | string | no | |
### hbase.zookeeper.quorum [string]
@@ -41,6 +42,10 @@ Two write modes are supported, `overwrite` and `append` .
`overwrite` means that
`append` means that the original data of the `hbase table` will not be
cleared, and the load operation will be performed directly.
+### nullable [bool]
+
+Whether the null value is written to hbase
+
### hbase.* [string]
Users can also specify multiple optional parameters. For a detailed list of
parameters, see [Hbase Supported
Parameters](https://hbase.apache.org/book.html#config.files).
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hbase/src/main/scala/org/apache/seatunnel/spark/hbase/Config.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hbase/src/main/scala/org/apache/seatunnel/spark/hbase/Config.scala
index 493aba7e8..ba97bbe19 100644
---
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hbase/src/main/scala/org/apache/seatunnel/spark/hbase/Config.scala
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hbase/src/main/scala/org/apache/seatunnel/spark/hbase/Config.scala
@@ -42,4 +42,9 @@ object Config extends Serializable {
*/
val HBASE_ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum"
+ /**
+ * nullable
+ */
+ val NULLABLE = "nullable"
+
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hbase/src/main/scala/org/apache/seatunnel/spark/hbase/sink/Hbase.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hbase/src/main/scala/org/apache/seatunnel/spark/hbase/sink/Hbase.scala
index 0e5c6d123..cfc166c98 100644
---
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hbase/src/main/scala/org/apache/seatunnel/spark/hbase/sink/Hbase.scala
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hbase/src/main/scala/org/apache/seatunnel/spark/hbase/sink/Hbase.scala
@@ -30,7 +30,7 @@ import org.apache.hadoop.hbase.util.Bytes
import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
import org.apache.seatunnel.common.config.CheckResult
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory
-import org.apache.seatunnel.spark.hbase.Config.{CATALOG,
HBASE_ZOOKEEPER_QUORUM, SAVE_MODE, STAGING_DIR}
+import org.apache.seatunnel.spark.hbase.Config.{CATALOG,
HBASE_ZOOKEEPER_QUORUM, NULLABLE, SAVE_MODE, STAGING_DIR}
import org.apache.seatunnel.spark.SparkEnvironment
import org.apache.seatunnel.spark.batch.SparkBatchSink
import org.apache.spark.internal.Logging
@@ -52,7 +52,8 @@ class Hbase extends SparkBatchSink with Logging {
override def prepare(env: SparkEnvironment): Unit = {
val defaultConfig = ConfigFactory.parseMap(
Map(
- SAVE_MODE -> HbaseSaveMode.Append.toString.toLowerCase))
+ SAVE_MODE -> HbaseSaveMode.Append.toString.toLowerCase,
+ NULLABLE -> false))
config = config.withFallback(defaultConfig)
hbaseConf =
HBaseConfiguration.create(env.getSparkSession.sessionState.newHadoopConf())
@@ -73,6 +74,7 @@ class Hbase extends SparkBatchSink with Logging {
val colNames = df.columns
val catalog = config.getString(CATALOG)
val stagingDir = config.getString(STAGING_DIR) + "/" +
System.currentTimeMillis().toString
+ val nullable = config.getBoolean(NULLABLE)
// convert all columns type to string
for (colName <- colNames) {
@@ -118,6 +120,9 @@ class Hbase extends SparkBatchSink with Logging {
val qualifier = c._2
val value = r.getAs[String](c._3)
if (value == null) {
+ if (nullable) {
+ familyQualifiersValues += (family, qualifier, null)
+ }
break
}
familyQualifiersValues += (family, qualifier,
Bytes.toBytes(value))