This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch 2.1.4-prepare
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/2.1.4-prepare by this push:
     new 1c8bce621 [Hotfix][Connector-V1][Hbase] When the written df is empty, 
the directory does not exist when the load file is loaded (#3169) (#3424)
1c8bce621 is described below

commit 1c8bce621de1cafc038281d2acd265024a2993fd
Author: Kirs <[email protected]>
AuthorDate: Mon Nov 14 15:22:26 2022 +0800

    [Hotfix][Connector-V1][Hbase] When the written df is empty, the directory 
does not exist when the load file is loaded (#3169) (#3424)
    
    Co-authored-by: zhouyao <[email protected]>
    
    Co-authored-by: Carl-Zhou-CN 
<[email protected]>
    Co-authored-by: zhouyao <[email protected]>
---
 .../apache/seatunnel/spark/hbase/sink/Hbase.scala  | 33 ++++++++++------------
 1 file changed, 15 insertions(+), 18 deletions(-)

diff --git 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hbase/src/main/scala/org/apache/seatunnel/spark/hbase/sink/Hbase.scala
 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hbase/src/main/scala/org/apache/seatunnel/spark/hbase/sink/Hbase.scala
index cfc166c98..74e92a360 100644
--- 
a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hbase/src/main/scala/org/apache/seatunnel/spark/hbase/sink/Hbase.scala
+++ 
b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-hbase/src/main/scala/org/apache/seatunnel/spark/hbase/sink/Hbase.scala
@@ -18,9 +18,8 @@ package org.apache.seatunnel.spark.hbase.sink
 
 import scala.collection.JavaConversions._
 import scala.util.control.Breaks._
-
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
 import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory}
 import org.apache.hadoop.hbase.spark.{ByteArrayWrapper, 
FamiliesQualifiersValues, HBaseContext}
@@ -88,6 +87,8 @@ class Hbase extends SparkBatchSink with Logging {
     val columnFamily = htc.getColumnFamilies
     val saveMode = config.getString(SAVE_MODE).toLowerCase
     val hbaseConn = ConnectionFactory.createConnection(hbaseConf)
+    val stagingPath = new Path(stagingDir)
+    val fs = stagingPath.getFileSystem(hbaseContext.config)
 
     try {
       if (saveMode == HbaseSaveMode.Overwrite.toString.toLowerCase) {
@@ -132,31 +133,27 @@ class Hbase extends SparkBatchSink with Logging {
         },
         stagingDir)
 
-      val load = new LoadIncrementalHFiles(hbaseConf)
-      val table = hbaseConn.getTable(tableName)
-      load.doBulkLoad(
-        new Path(stagingDir),
-        hbaseConn.getAdmin,
-        table,
-        hbaseConn.getRegionLocator(tableName))
+      if (fs.exists(stagingPath)) {
+        val load = new LoadIncrementalHFiles(hbaseConf)
+        val table = hbaseConn.getTable(tableName)
+        load.doBulkLoad(
+          stagingPath,
+          hbaseConn.getAdmin,
+          table,
+          hbaseConn.getRegionLocator(tableName))
+      }
 
     } finally {
       if (hbaseConn != null) {
         hbaseConn.close()
       }
-
-      cleanUpStagingDir(stagingDir)
+      cleanUpStagingDir(stagingPath, fs)
     }
   }
 
-  private def cleanUpStagingDir(stagingDir: String): Unit = {
-    val stagingPath = new Path(stagingDir)
-    val fs = stagingPath.getFileSystem(hbaseContext.config)
+  private def cleanUpStagingDir(stagingPath: Path, fs: FileSystem): Unit = {
     if (!fs.delete(stagingPath, true)) {
-      logWarning(s"clean staging dir $stagingDir failed")
-    }
-    if (fs != null) {
-      fs.close()
+      logWarning(s"clean staging dir $stagingPath failed")
     }
   }
 

Reply via email to