[HOTFIX][Streaming] Avoid throwing NPE during deleting the streaming lock file

We should check whether the lock file is exists or not before deleting it.
If the lock file is exists, need to delete it.
If the lock file is not exists, no need to do anything.

This closes #2775


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/c0163616
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/c0163616
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/c0163616

Branch: refs/heads/branch-1.5
Commit: c016361639df899eeefa956e2dcb23fa962e6e7f
Parents: 8284d9e
Author: QiangCai <[email protected]>
Authored: Thu Sep 27 20:32:40 2018 +0800
Committer: Jacky Li <[email protected]>
Committed: Fri Sep 28 19:39:03 2018 +0800

----------------------------------------------------------------------
 .../management/CarbonAlterTableCompactionCommand.scala       | 8 ++++++--
 .../spark/carbondata/TestStreamingTableOperation.scala       | 3 +++
 2 files changed, 9 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/c0163616/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index 8b6dabd..b699ec1 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -334,8 +334,12 @@ case class CarbonAlterTableCompactionCommand(
     val streamingLock = CarbonLockFactory.getCarbonLockObj(
       carbonTable.getTableInfo.getOrCreateAbsoluteTableIdentifier,
       LockUsage.STREAMING_LOCK)
-    if (!FileFactory.getCarbonFile(streamingLock.getLockFilePath).delete()) {
-       LOGGER.warn("failed to delete lock file: " + 
streamingLock.getLockFilePath)
+    val lockFile =
+      FileFactory.getCarbonFile(streamingLock.getLockFilePath, 
FileFactory.getConfiguration)
+    if (lockFile.exists()) {
+      if (!lockFile.delete()) {
+        LOGGER.warn("failed to delete lock file: " + 
streamingLock.getLockFilePath)
+      }
     }
     try {
       if (streamingLock.lockWithRetries()) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c0163616/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index c4e3517..43c1e5a 100644
--- 
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -1506,6 +1506,9 @@ class TestStreamingTableOperation extends QueryTest with 
BeforeAndAfterAll {
   }
 
   test("auto hand off, close and reopen streaming table") {
+    sql("alter table streaming.stream_table_reopen compact 'close_streaming'")
+    sql("ALTER TABLE streaming.stream_table_reopen SET 
TBLPROPERTIES('streaming'='true')")
+
     executeStreamingIngest(
       tableName = "stream_table_reopen",
       batchNums = 2,

Reply via email to