Repository: carbondata
Updated Branches:
  refs/heads/master a5e7b41b5 -> 8c17ceead


[CARBONDATA-1361] Reduced the SDV cluster time to 22 minutes

This closes #1225


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/8c17ceea
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/8c17ceea
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/8c17ceea

Branch: refs/heads/master
Commit: 8c17ceeadffbb8525abc5dd5ea2435ec1bd1c864
Parents: a5e7b41
Author: Ravindra Pesala <ravi.pes...@gmail.com>
Authored: Fri Aug 4 19:22:09 2017 +0530
Committer: chenliang613 <chenliang...@apache.org>
Committed: Sun Aug 6 12:17:24 2017 +0800

----------------------------------------------------------------------
 assembly/pom.xml                                |  8 +--
 .../carbondata/core/util/SessionParams.java     | 21 +++++-
 integration/spark-common-cluster-test/pom.xml   |  4 +-
 .../cluster/sdv/suite/SDVSuites.scala           | 73 ++++++++++++--------
 .../apache/carbondata/spark/rdd/CarbonRDD.scala | 15 ++--
 .../sql/test/ResourceRegisterAndCopier.scala    | 52 +++++++++++---
 .../spark/sql/test/TestQueryExecutor.scala      |  1 +
 .../sql/test/Spark2TestQueryExecutor.scala      |  9 +--
 8 files changed, 128 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c17ceea/assembly/pom.xml
----------------------------------------------------------------------
diff --git a/assembly/pom.xml b/assembly/pom.xml
index ee9e460..f4d4f70 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -138,10 +138,6 @@
     </profile>
     <profile>
       <id>spark-1.6</id>
-      <!-- default -->
-      <activation>
-        <activeByDefault>true</activeByDefault>
-      </activation>
       <dependencies>
         <dependency>
           <groupId>org.apache.carbondata</groupId>
@@ -152,6 +148,10 @@
     </profile>
     <profile>
       <id>spark-2.1</id>
+      <!-- default -->
+      <activation>
+        <activeByDefault>true</activeByDefault>
+      </activation>
       <dependencies>
         <dependency>
           <groupId>org.apache.carbondata</groupId>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c17ceea/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java 
b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
index f06ba01..6d8c900 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
@@ -29,7 +29,15 @@ import 
org.apache.carbondata.core.exception.InvalidConfigurationException;
 
 import static 
org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION;
 import static 
org.apache.carbondata.core.constants.CarbonCommonConstants.ENABLE_UNSAFE_SORT;
-import static org.apache.carbondata.core.constants.CarbonLoadOptionConstants.*;
+import static 
org.apache.carbondata.core.constants.CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION;
+import static 
org.apache.carbondata.core.constants.CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE;
+import static 
org.apache.carbondata.core.constants.CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH;
+import static 
org.apache.carbondata.core.constants.CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB;
+import static 
org.apache.carbondata.core.constants.CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT;
+import static 
org.apache.carbondata.core.constants.CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS;
+import static 
org.apache.carbondata.core.constants.CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD;
+import static 
org.apache.carbondata.core.constants.CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS;
+import static 
org.apache.carbondata.core.constants.CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE;
 
 /**
  * This class maintains carbon session params
@@ -40,9 +48,11 @@ public class SessionParams implements Serializable {
       LogServiceFactory.getLogService(CacheProvider.class.getName());
 
   private Map<String, String> sProps;
+  private Map<String, String> addedProps;
 
   public SessionParams() {
     sProps = new HashMap<>();
+    addedProps = new HashMap<>();
   }
 
   /**
@@ -70,6 +80,15 @@ public class SessionParams implements Serializable {
     return this;
   }
 
+  public SessionParams addProps(Map<String, String> addedProps) {
+    this.addedProps.putAll(addedProps);
+    return this;
+  }
+
+  public Map<String, String> getAddedProps() {
+    return addedProps;
+  }
+
   /**
    * validate the key value to be set using set command
    * @param key

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c17ceea/integration/spark-common-cluster-test/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark-common-cluster-test/pom.xml 
b/integration/spark-common-cluster-test/pom.xml
index 0081e4e..e6ae541 100644
--- a/integration/spark-common-cluster-test/pom.xml
+++ b/integration/spark-common-cluster-test/pom.xml
@@ -124,7 +124,7 @@
             <include>**/*Suite.java</include>
           </includes>
           
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
-          <argLine>-Xmx4g -XX:MaxPermSize=512m 
-XX:ReservedCodeCacheSize=512m</argLine>
+          <argLine>-Xmx6g -XX:MaxPermSize=512m 
-XX:ReservedCodeCacheSize=512m</argLine>
           <systemProperties>
             <java.awt.headless>true</java.awt.headless>
           </systemProperties>
@@ -141,7 +141,7 @@
           
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
           <junitxml>.</junitxml>
           <filereports>CarbonTestSuite.txt</filereports>
-          <argLine>-ea -Xmx6g -XX:MaxPermSize=512m 
-XX:ReservedCodeCacheSize=512m
+          <argLine>-ea -Xmx5g -XX:MaxPermSize=512m 
-XX:ReservedCodeCacheSize=512m
           </argLine>
           <stderr />
           <environmentVariables>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c17ceea/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/suite/SDVSuites.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/suite/SDVSuites.scala
 
b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/suite/SDVSuites.scala
index de946a0..b9908ea 100644
--- 
a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/suite/SDVSuites.scala
+++ 
b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/suite/SDVSuites.scala
@@ -70,21 +70,13 @@ class SDVSuites extends Suites with BeforeAndAfterAll {
  */
 class SDVSuites1 extends Suites with BeforeAndAfterAll {
 
-  val suites =                 new AlterTableTestCase ::
-                               new BadRecordTestCase ::
-                               new BatchSortLoad1TestCase ::
-                               new BatchSortLoad2TestCase ::
-                               new BatchSortQueryTestCase ::
-                               new ColumndictTestCase ::
-                               new DataLoadingTestCase ::
-                               new DataLoadingV3TestCase ::
-                               new InvertedindexTestCase ::
-                               new OffheapQuery1TestCase ::
-                               new OffheapQuery2TestCase ::
-                               new OffheapSort1TestCase ::
-                               new OffheapSort2TestCase ::
-                               new PartitionTestCase ::
-                               new QueriesBasicTestCase :: Nil
+  val suites =                   new BadRecordTestCase ::
+                                 new BatchSortLoad1TestCase ::
+                                 new BatchSortQueryTestCase ::
+                                 new DataLoadingTestCase ::
+                                 new OffheapSort2TestCase ::
+                                 new PartitionTestCase ::
+    new QueriesBasicTestCase :: Nil
 
   override val nestedSuites = suites.toIndexedSeq
 
@@ -100,20 +92,43 @@ class SDVSuites1 extends Suites with BeforeAndAfterAll {
  */
 class SDVSuites2 extends Suites with BeforeAndAfterAll {
 
-  val suites =    new QueriesBVATestCase ::
-                  new QueriesCompactionTestCase ::
-                  new QueriesExcludeDictionaryTestCase ::
-                  new QueriesIncludeDictionaryTestCase ::
-                  new QueriesNormalTestCase ::
-                  new QueriesRangeFilterTestCase ::
-                  new QueriesSparkBlockDistTestCase ::
-                  new ShowLoadsTestCase ::
-                  new SinglepassTestCase ::
-                  new SortColumnTestCase ::
-                  new TimestamptypesTestCase ::
-                  new V3offheapvectorTestCase ::
-                  new Vector1TestCase ::
-                  new Vector2TestCase :: Nil
+  val suites =      new QueriesBVATestCase ::
+                    new QueriesCompactionTestCase ::
+                    new QueriesExcludeDictionaryTestCase ::
+                    new QueriesIncludeDictionaryTestCase :: Nil
+
+  override val nestedSuites = suites.toIndexedSeq
+
+  override protected def afterAll() = {
+    println("---------------- Stopping spark -----------------")
+    TestQueryExecutor.INSTANCE.stop()
+    println("---------------- Stopped spark -----------------")
+  }
+}
+
+/**
+ * Suite class for all tests.
+ */
+class SDVSuites3 extends Suites with BeforeAndAfterAll {
+
+  val suites =      new AlterTableTestCase ::
+                    new BatchSortLoad2TestCase ::
+                    new InvertedindexTestCase ::
+                    new OffheapQuery1TestCase ::
+                    new OffheapQuery2TestCase ::
+                    new OffheapSort1TestCase ::
+                    new ShowLoadsTestCase ::
+                    new SinglepassTestCase ::
+                    new SortColumnTestCase ::
+                    new TimestamptypesTestCase ::
+                    new V3offheapvectorTestCase ::
+                    new Vector1TestCase ::
+                    new Vector2TestCase ::
+                    new QueriesNormalTestCase ::
+                    new ColumndictTestCase ::
+                    new QueriesRangeFilterTestCase ::
+                    new QueriesSparkBlockDistTestCase ::
+                    new DataLoadingV3TestCase :: Nil
 
   override val nestedSuites = suites.toIndexedSeq
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c17ceea/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
index b137d6d..a35c896 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
@@ -34,9 +34,15 @@ import org.apache.carbondata.core.util.{CarbonProperties, 
CarbonSessionInfo, Car
 abstract class CarbonRDD[T: ClassTag](@transient sc: SparkContext,
     @transient private var deps: Seq[Dependency[_]]) extends RDD[T](sc, deps) {
 
-  val carbonSessionInfo: CarbonSessionInfo = 
ThreadLocalSessionInfo.getCarbonSessionInfo
-
-//  val addedProperty = CarbonProperties.getInstance().getAddedProperty
+  val carbonSessionInfo: CarbonSessionInfo = {
+    var info = ThreadLocalSessionInfo.getCarbonSessionInfo
+    if (info == null || info.getSessionParams == null) {
+      info = new CarbonSessionInfo
+      info.setSessionParams(new SessionParams())
+    }
+    
info.getSessionParams.addProps(CarbonProperties.getInstance().getAddedProperty)
+    info
+  }
 
   /** Construct an RDD with just a one-to-one dependency on one parent */
   def this(@transient oneParent: RDD[_]) =
@@ -50,7 +56,8 @@ abstract class CarbonRDD[T: ClassTag](@transient sc: 
SparkContext,
     val carbonTaskInfo = new CarbonTaskInfo
     carbonTaskInfo.setTaskId(System.nanoTime)
     ThreadLocalTaskInfo.setCarbonTaskInfo(carbonTaskInfo)
-//    addedProperty.asScala.map(f => 
CarbonProperties.getInstance().addProperty(f._1, f._2))
+    carbonSessionInfo.getSessionParams.getAddedProps.asScala.
+      map(f => CarbonProperties.getInstance().addProperty(f._1, f._2))
     internalCompute(split, context)
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c17ceea/integration/spark-common/src/main/scala/org/apache/spark/sql/test/ResourceRegisterAndCopier.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/ResourceRegisterAndCopier.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/ResourceRegisterAndCopier.scala
index 6364630..b025c4c 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/ResourceRegisterAndCopier.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/ResourceRegisterAndCopier.scala
@@ -25,6 +25,7 @@ import org.apache.hadoop.io.IOUtils
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.locks.HdfsFileLock
 import org.apache.carbondata.core.util.CarbonUtil
 
 /**
@@ -46,19 +47,48 @@ object ResourceRegisterAndCopier {
     if (!file.exists()) {
       sys.error(s"""Provided path $hdfsPath does not exist""")
     }
-    val resources = readDataFiles(dataFilesPath)
-    resources.foreach { file =>
-      val hdfsDataPath = hdfsPath + "/" + file
-      val rsFile = FileFactory.getCarbonFile(hdfsDataPath, fileType)
-      if (!rsFile.exists()) {
-        val target = resourcePath + "/" + file
-        new File(resourcePath + "/" + file.substring(0, 
file.lastIndexOf("/"))).mkdirs()
-        downloadFile(link, file, target)
-        // copy it
-        copyLocalFile(hdfsDataPath, target)
-        new File(target).delete()
+    val lock = new HdfsFileLock("", "resource.lock")
+    var bool = false
+    try {
+      bool = lockWithRetries(lock)
+      if (bool) {
+        val resources = readDataFiles(dataFilesPath)
+        resources.foreach { file =>
+          val hdfsDataPath = hdfsPath + "/" + file
+          val rsFile = FileFactory.getCarbonFile(hdfsDataPath, fileType)
+          if (!rsFile.exists()) {
+            val target = resourcePath + "/" + file
+            new File(resourcePath + "/" + file.substring(0, 
file.lastIndexOf("/"))).mkdirs()
+            downloadFile(link, file, target)
+            // copy it
+            copyLocalFile(hdfsDataPath, target)
+            new File(target).delete()
+          }
+        }
       }
+    } finally {
+      if (bool) {
+        lock.unlock()
+      }
+    }
+  }
+
+  def lockWithRetries(lock: HdfsFileLock): Boolean = {
+    try {
+      var i = 0
+      while (i < 10) {
+        if (lock.lock()) {
+          return true
+        } else {
+          Thread.sleep(30 * 1000L)
+        }
+        i += 1
+      }
+    } catch {
+      case e: InterruptedException =>
+        return false
     }
+    false
   }
 
   def readDataFiles(dataFilesPath: String): Seq[String] = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c17ceea/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
index cf90912..6f177e6 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/TestQueryExecutor.scala
@@ -71,6 +71,7 @@ object TestQueryExecutor {
   }
 
   val resourcesPath = if (hdfsUrl.startsWith("hdfs://")) {
+    System.setProperty(CarbonCommonConstants.HDFS_TEMP_LOCATION, hdfsUrl)
     ResourceRegisterAndCopier.
       copyResourcesifNotExists(hdfsUrl, 
s"$integrationPath/spark-common-test/src/test/resources",
         
s"$integrationPath//spark-common-cluster-test/src/test/resources/testdatafileslist.txt")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8c17ceea/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala
index 8aa25a4..2c163a2 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala
@@ -53,10 +53,11 @@ object Spark2TestQueryExecutor {
   val conf = new SparkConf()
   if (!TestQueryExecutor.masterUrl.startsWith("local")) {
     conf.setJars(TestQueryExecutor.jars).
-      set("spark.driver.memory", "4g").
-      set("spark.executor.memory", "8g").
-      set("spark.executor.cores", "4").
-      set("spark.cores.max", "8")
+      set("spark.driver.memory", "6g").
+      set("spark.executor.memory", "4g").
+      set("spark.executor.cores", "2").
+      set("spark.executor.instances", "2").
+      set("spark.cores.max", "4")
     FileFactory.getConfiguration.
       set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER")
   }

Reply via email to