Repository: carbondata
Updated Branches:
  refs/heads/master 8c17ceead -> 7b5a1c3b9


[CARBONDATA-1368] Fix HDFS lock issue in SDV cluster

HDFS lock issue in SDV cluster
All runs share the same lock so resulting some test fails randomly. This PR fix 
takes the lock from the corresponding store location.

This closes #1247


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/7b5a1c3b
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/7b5a1c3b
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/7b5a1c3b

Branch: refs/heads/master
Commit: 7b5a1c3b9bbf19d209fee3c7a36ce0d9a4dce8b4
Parents: 8c17cee
Author: Ravindra Pesala <ravi.pes...@gmail.com>
Authored: Tue Aug 8 23:23:07 2017 +0530
Committer: Jacky Li <jacky.li...@qq.com>
Committed: Wed Aug 9 13:01:35 2017 +0800

----------------------------------------------------------------------
 .../org/apache/carbondata/core/locks/HdfsFileLock.java  |  8 ++++++++
 .../cluster/sdv/generated/AlterTableTestCase.scala      |  2 +-
 .../cluster/sdv/generated/DataLoadingTestCase.scala     |  2 +-
 .../org/apache/spark/sql/common/util/QueryTest.scala    |  2 --
 .../spark/sql/test/ResourceRegisterAndCopier.scala      |  3 ++-
 .../org/apache/spark/sql/test/TestQueryExecutor.scala   |  4 ----
 .../apache/spark/sql/test/Spark2TestQueryExecutor.scala | 12 ++++++++++--
 7 files changed, 22 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b5a1c3b/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java 
b/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java
index 326f8ae..aa24e33 100644
--- a/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java
+++ b/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java
@@ -73,6 +73,14 @@ public class HdfsFileLock extends AbstractCarbonLock {
   }
 
   /**
+   * @param lockFilePath
+   */
+  public HdfsFileLock(String lockFilePath) {
+    this.location = lockFilePath;
+    initRetry();
+  }
+
+  /**
    * @param tableIdentifier
    * @param lockFile
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b5a1c3b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/AlterTableTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/AlterTableTestCase.scala
 
b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/AlterTableTestCase.scala
index e6520a4..46f82f2 100644
--- 
a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/AlterTableTestCase.scala
+++ 
b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/AlterTableTestCase.scala
@@ -1073,7 +1073,7 @@ class AlterTableTestCase extends QueryTest with 
BeforeAndAfterAll {
     prop.addProperty("carbon.enable.auto.load.merge", "false")
     prop.addProperty("carbon.bad.records.action", "FORCE")
     prop.addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
-      TestQueryExecutor.storeLocation+"/baaaaaaadrecords")
+      TestQueryExecutor.warehouse+"/baaaaaaadrecords")
   }
 
   override def afterAll: Unit = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b5a1c3b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/DataLoadingTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/DataLoadingTestCase.scala
 
b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/DataLoadingTestCase.scala
index a931710..3728db0 100644
--- 
a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/DataLoadingTestCase.scala
+++ 
b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/DataLoadingTestCase.scala
@@ -1471,6 +1471,6 @@ class DataLoadingTestCase extends QueryTest with 
BeforeAndAfterAll {
   override protected def beforeAll(): Unit = {
     sql(s"""drop table if exists uniqdata""").collect
     
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
-      TestQueryExecutor.storeLocation+"/baaaaaaadrecords")
+      TestQueryExecutor.warehouse + "/baaaaaaadrecords")
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b5a1c3b/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
 
b/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
index 292c160..7c78b10 100644
--- 
a/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
+++ 
b/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala
@@ -125,9 +125,7 @@ class QueryTest extends PlanTest with Suite {
 
   val sqlContext: SQLContext = TestQueryExecutor.INSTANCE.sqlContext
 
-  val storeLocation = TestQueryExecutor.storeLocation
   val resourcesPath = TestQueryExecutor.resourcesPath
-  val integrationPath = TestQueryExecutor.integrationPath
 }
 
 object QueryTest {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b5a1c3b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/ResourceRegisterAndCopier.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/ResourceRegisterAndCopier.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/ResourceRegisterAndCopier.scala
index b025c4c..87a60c5 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/ResourceRegisterAndCopier.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/ResourceRegisterAndCopier.scala
@@ -47,7 +47,8 @@ object ResourceRegisterAndCopier {
     if (!file.exists()) {
       sys.error(s"""Provided path $hdfsPath does not exist""")
     }
-    val lock = new HdfsFileLock("", "resource.lock")
+    LOGGER.audit("Try downloading resource data")
+    val lock = new HdfsFileLock(hdfsPath + "/resource.lock")
     var bool = false
     try {
       bool = lockWithRetries(lock)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b5a1c3b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
index 6f177e6..d2b565f 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
@@ -71,10 +71,6 @@ object TestQueryExecutor {
   }
 
   val resourcesPath = if (hdfsUrl.startsWith("hdfs://")) {
-    System.setProperty(CarbonCommonConstants.HDFS_TEMP_LOCATION, hdfsUrl)
-    ResourceRegisterAndCopier.
-      copyResourcesifNotExists(hdfsUrl, 
s"$integrationPath/spark-common-test/src/test/resources",
-        
s"$integrationPath//spark-common-cluster-test/src/test/resources/testdatafileslist.txt")
     hdfsUrl
   } else {
     s"$integrationPath/spark-common-test/src/test/resources"

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7b5a1c3b/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala
index 2c163a2..3fed15d 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.test
 
 import org.apache.spark.SparkConf
 import org.apache.spark.sql._
-import org.apache.spark.sql.test.TestQueryExecutor.integrationPath
+import org.apache.spark.sql.test.TestQueryExecutor.{hdfsUrl, integrationPath, 
warehouse}
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -67,9 +67,17 @@ object Spark2TestQueryExecutor {
     .master(TestQueryExecutor.masterUrl)
     .appName("Spark2TestQueryExecutor")
     .enableHiveSupport()
-    .config("spark.sql.warehouse.dir", TestQueryExecutor.warehouse)
+    .config("spark.sql.warehouse.dir", warehouse)
     .config("spark.sql.crossJoin.enabled", "true")
     .getOrCreateCarbonSession(null, TestQueryExecutor.metastoredb)
+  if (warehouse.startsWith("hdfs://")) {
+    System.setProperty(CarbonCommonConstants.HDFS_TEMP_LOCATION, warehouse)
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.LOCK_TYPE,
+      CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS)
+    ResourceRegisterAndCopier.
+      copyResourcesifNotExists(hdfsUrl, 
s"$integrationPath/spark-common-test/src/test/resources",
+        
s"$integrationPath//spark-common-cluster-test/src/test/resources/testdatafileslist.txt")
+  }
   FileFactory.getConfiguration.
     set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER")
   spark.sparkContext.setLogLevel("ERROR")

Reply via email to