[HOTFIX][Streaming] Avoid throwing NPE during deleting the streaming lock file
We should check whether the lock file is exists or not before deleting it. If the lock file is exists, need to delete it. If the lock file is not exists, no need to do anything. This closes #2775 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/c0163616 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/c0163616 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/c0163616 Branch: refs/heads/branch-1.5 Commit: c016361639df899eeefa956e2dcb23fa962e6e7f Parents: 8284d9e Author: QiangCai <[email protected]> Authored: Thu Sep 27 20:32:40 2018 +0800 Committer: Jacky Li <[email protected]> Committed: Fri Sep 28 19:39:03 2018 +0800 ---------------------------------------------------------------------- .../management/CarbonAlterTableCompactionCommand.scala | 8 ++++++-- .../spark/carbondata/TestStreamingTableOperation.scala | 3 +++ 2 files changed, 9 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/c0163616/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala index 8b6dabd..b699ec1 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala @@ -334,8 +334,12 @@ case class CarbonAlterTableCompactionCommand( val streamingLock = CarbonLockFactory.getCarbonLockObj( carbonTable.getTableInfo.getOrCreateAbsoluteTableIdentifier, LockUsage.STREAMING_LOCK) - if (!FileFactory.getCarbonFile(streamingLock.getLockFilePath).delete()) { - LOGGER.warn("failed to delete lock file: " + streamingLock.getLockFilePath) + val lockFile = + FileFactory.getCarbonFile(streamingLock.getLockFilePath, FileFactory.getConfiguration) + if (lockFile.exists()) { + if (!lockFile.delete()) { + LOGGER.warn("failed to delete lock file: " + streamingLock.getLockFilePath) + } } try { if (streamingLock.lockWithRetries()) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/c0163616/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala index c4e3517..43c1e5a 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala @@ -1506,6 +1506,9 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { } test("auto hand off, close and reopen streaming table") { + sql("alter table streaming.stream_table_reopen compact 'close_streaming'") + sql("ALTER TABLE streaming.stream_table_reopen SET TBLPROPERTIES('streaming'='true')") + executeStreamingIngest( tableName = "stream_table_reopen", batchNums = 2,
