This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch 2.1.4-prepare
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/2.1.4-prepare by this push:
new 1c8bce621 [Hotfix][Connector-V1][Hbase] When the written df is empty,
the directory does not exist when the load file is loaded (#3169) (#3424)
1c8bce621 is described below
commit 1c8bce621de1cafc038281d2acd265024a2993fd
Author: Kirs <[email protected]>
AuthorDate: Mon Nov 14 15:22:26 2022 +0800
[Hotfix][Connector-V1][Hbase] When the written df is empty, the directory
does not exist when the load file is loaded (#3169) (#3424)
Co-authored-by: zhouyao <[email protected]>
Co-authored-by: Carl-Zhou-CN
<[email protected]>
Co-authored-by: zhouyao <[email protected]>
---
.../apache/seatunnel/spark/hbase/sink/Hbase.scala | 33 ++++++++++------------
1 file changed, 15 insertions(+), 18 deletions(-)
diff --git
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hbase/src/main/scala/org/apache/seatunnel/spark/hbase/sink/Hbase.scala
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hbase/src/main/scala/org/apache/seatunnel/spark/hbase/sink/Hbase.scala
index cfc166c98..74e92a360 100644
---
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hbase/src/main/scala/org/apache/seatunnel/spark/hbase/sink/Hbase.scala
+++
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hbase/src/main/scala/org/apache/seatunnel/spark/hbase/sink/Hbase.scala
@@ -18,9 +18,8 @@ package org.apache.seatunnel.spark.hbase.sink
import scala.collection.JavaConversions._
import scala.util.control.Breaks._
-
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory}
import org.apache.hadoop.hbase.spark.{ByteArrayWrapper,
FamiliesQualifiersValues, HBaseContext}
@@ -88,6 +87,8 @@ class Hbase extends SparkBatchSink with Logging {
val columnFamily = htc.getColumnFamilies
val saveMode = config.getString(SAVE_MODE).toLowerCase
val hbaseConn = ConnectionFactory.createConnection(hbaseConf)
+ val stagingPath = new Path(stagingDir)
+ val fs = stagingPath.getFileSystem(hbaseContext.config)
try {
if (saveMode == HbaseSaveMode.Overwrite.toString.toLowerCase) {
@@ -132,31 +133,27 @@ class Hbase extends SparkBatchSink with Logging {
},
stagingDir)
- val load = new LoadIncrementalHFiles(hbaseConf)
- val table = hbaseConn.getTable(tableName)
- load.doBulkLoad(
- new Path(stagingDir),
- hbaseConn.getAdmin,
- table,
- hbaseConn.getRegionLocator(tableName))
+ if (fs.exists(stagingPath)) {
+ val load = new LoadIncrementalHFiles(hbaseConf)
+ val table = hbaseConn.getTable(tableName)
+ load.doBulkLoad(
+ stagingPath,
+ hbaseConn.getAdmin,
+ table,
+ hbaseConn.getRegionLocator(tableName))
+ }
} finally {
if (hbaseConn != null) {
hbaseConn.close()
}
-
- cleanUpStagingDir(stagingDir)
+ cleanUpStagingDir(stagingPath, fs)
}
}
- private def cleanUpStagingDir(stagingDir: String): Unit = {
- val stagingPath = new Path(stagingDir)
- val fs = stagingPath.getFileSystem(hbaseContext.config)
+ private def cleanUpStagingDir(stagingPath: Path, fs: FileSystem): Unit = {
if (!fs.delete(stagingPath, true)) {
- logWarning(s"clean staging dir $stagingDir failed")
- }
- if (fs != null) {
- fs.close()
+ logWarning(s"clean staging dir $stagingPath failed")
}
}