This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new c8c0211  [CARBONDATA-3378]Display original query in Indexserver Job
c8c0211 is described below

commit c8c0211c33371bf5cb5b2b912553076361097cc0
Author: BJangir <[email protected]>
AuthorDate: Tue May 7 19:06:36 2019 +0530

    [CARBONDATA-3378]Display original query in Indexserver Job
    
    When any query fired in main jdbcserver , in Index server
    there is no mapping of it.
    It is difficult to find which job in index server belong to which
    query specially in concurrent queries.
    This PR will display query in index server also along with Executionid .
    
    This closes #3208
---
 .../core/constants/CarbonCommonConstants.java      |  5 +++
 .../core/datamap/DistributableDataMapFormat.java   | 48 ++++++++++++++++++++++
 .../carbondata/indexserver/DataMapJobs.scala       | 23 +++++++++++
 .../carbondata/indexserver/IndexServer.scala       |  6 +++
 4 files changed, 82 insertions(+)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 311019c..8b39343 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -2184,4 +2184,9 @@ public final class CarbonCommonConstants {
 
   public static final String LOAD_SYNC_TIME = "load_sync_time";
 
+  public  static final String CARBON_INDEX_SERVER_JOBNAME_LENGTH =
+          "carbon.index.server.max.jobname.length";
+
+  public  static final String CARBON_INDEX_SERVER_JOBNAME_LENGTH_DEFAULT =
+          "50";
 }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java
 
b/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java
index 57540e4..0478b40 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java
@@ -26,6 +26,7 @@ import java.util.Iterator;
 import java.util.List;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.dev.DataMap;
 import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper;
 import 
org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder;
@@ -36,6 +37,8 @@ import 
org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
 import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+
+import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.ObjectSerializationUtil;
 
 import org.apache.commons.lang.StringUtils;
@@ -78,6 +81,11 @@ public class DistributableDataMapFormat extends 
FileInputFormat<Void, ExtendedBl
 
   private ReadCommittedScope readCommittedScope;
 
+  private String taskGroupId = "";
+
+  private String taskGroupDesc = "";
+
+
   DistributableDataMapFormat() {
 
   }
@@ -270,6 +278,8 @@ public class DistributableDataMapFormat extends 
FileInputFormat<Void, ExtendedBl
       out.writeBoolean(false);
     }
     out.writeUTF(dataMapToClear);
+    out.writeUTF(taskGroupId);
+    out.writeUTF(taskGroupDesc);
   }
 
   @Override
@@ -311,6 +321,8 @@ public class DistributableDataMapFormat extends 
FileInputFormat<Void, ExtendedBl
           .convertStringToObject(new String(filterResolverBytes, 
Charset.defaultCharset()));
     }
     this.dataMapToClear = in.readUTF();
+    this.taskGroupId = in.readUTF();
+    this.taskGroupDesc = in.readUTF();
   }
 
   private void initReadCommittedScope() throws IOException {
@@ -335,6 +347,42 @@ public class DistributableDataMapFormat extends 
FileInputFormat<Void, ExtendedBl
     return isJobToClearDataMaps;
   }
 
+  public String getTaskGroupId() {
+    return taskGroupId;
+  }
+
+  /* setTaskGroupId will be used for Index server to display ExecutionId*/
+  public void setTaskGroupId(String taskGroupId) {
+    this.taskGroupId = taskGroupId;
+  }
+
+  public String getTaskGroupDesc() {
+    return taskGroupDesc;
+  }
+
+  /* setTaskGroupId will be used for Index server to display Query
+  *  If Job name is >CARBON_INDEX_SERVER_JOBNAME_LENGTH
+  *  then need to cut as transferring big query to IndexServer will be costly.
+  */
+  public void setTaskGroupDesc(String taskGroupDesc) {
+    int maxJobLenth;
+    try {
+      String maxJobLenthString = CarbonProperties.getInstance()
+              
.getProperty(CarbonCommonConstants.CARBON_INDEX_SERVER_JOBNAME_LENGTH ,
+                      
CarbonCommonConstants.CARBON_INDEX_SERVER_JOBNAME_LENGTH_DEFAULT);
+      maxJobLenth = Integer.parseInt(maxJobLenthString);
+    } catch (Exception e) {
+      String maxJobLenthString = CarbonProperties.getInstance()
+              
.getProperty(CarbonCommonConstants.CARBON_INDEX_SERVER_JOBNAME_LENGTH_DEFAULT);
+      maxJobLenth = Integer.parseInt(maxJobLenthString);
+    }
+    if (taskGroupDesc.length() > maxJobLenth) {
+      this.taskGroupDesc = taskGroupDesc.substring(0, maxJobLenth);
+    } else {
+      this.taskGroupDesc = taskGroupDesc;
+    }
+  }
+
   public FilterResolverIntf getFilterResolverIntf() {
     return filterResolverIntf;
   }
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala
index 0ee4ebb..698dd58 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala
@@ -21,6 +21,7 @@ import java.util
 import scala.collection.JavaConverters._
 
 import org.apache.log4j.Logger
+import org.apache.spark.sql.util.SparkSQLUtil
 import org.apache.spark.util.SizeEstimator
 
 import org.apache.carbondata.common.logging.LogServiceFactory
@@ -47,6 +48,17 @@ class DistributedDataMapJob extends AbstractDataMapJob {
       LOGGER.debug(s"Size of message sent to Index Server: $messageSize")
     }
     val (resonse, time) = logTime {
+      val spark = SparkSQLUtil.getSparkSession
+      val taskGroupId = 
spark.sparkContext.getLocalProperty("spark.jobGroup.id") match {
+        case null => ""
+        case _ => spark.sparkContext.getLocalProperty("spark.jobGroup.id")
+      }
+      val taskGroupDesc = 
spark.sparkContext.getLocalProperty("spark.job.description") match {
+        case null => ""
+        case _ => spark.sparkContext.getLocalProperty("spark.job.description")
+      }
+      dataMapFormat.setTaskGroupId(taskGroupId)
+      dataMapFormat.setTaskGroupDesc(taskGroupDesc)
       var filterInf = dataMapFormat.getFilterResolverIntf
       val filterProcessor = new FilterExpressionProcessor
       filterInf = removeSparkUnknown(filterInf,
@@ -95,6 +107,17 @@ class DistributedDataMapJob extends AbstractDataMapJob {
 class EmbeddedDataMapJob extends AbstractDataMapJob {
 
   override def execute(dataMapFormat: DistributableDataMapFormat): 
util.List[ExtendedBlocklet] = {
+    val spark = SparkSQLUtil.getSparkSession
+    val taskGroupId = spark.sparkContext.getLocalProperty("spark.jobGroup.id") 
match {
+      case null => ""
+      case _ => spark.sparkContext.getLocalProperty("spark.jobGroup.id")
+    }
+    val taskGroupDesc = 
spark.sparkContext.getLocalProperty("spark.job.description") match {
+      case null => ""
+      case _ => spark.sparkContext.getLocalProperty("spark.job.description")
+    }
+    dataMapFormat.setTaskGroupId(taskGroupId)
+    dataMapFormat.setTaskGroupDesc(taskGroupDesc)
     IndexServer.getSplits(dataMapFormat).toList.asJava
   }
 
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
index f066095..9eee6d7 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
@@ -100,6 +100,8 @@ object IndexServer extends ServerInterface {
   }
 
   def getSplits(request: DistributableDataMapFormat): Array[ExtendedBlocklet] 
= doAs {
+    sparkSession.sparkContext.setLocalProperty("spark.jobGroup.id", 
request.getTaskGroupId)
+    sparkSession.sparkContext.setLocalProperty("spark.job.description", 
request.getTaskGroupDesc)
     val splits = new DistributedPruneRDD(sparkSession, request).collect()
     DistributedRDDUtils.updateExecutorCacheSize(splits.map(_._1).toSet)
     if (request.isJobToClearDataMaps) {
@@ -110,11 +112,15 @@ object IndexServer extends ServerInterface {
 
   override def invalidateSegmentCache(databaseName: String, tableName: String,
       segmentIds: Array[String]): Unit = doAs {
+    val jobgroup: String = " Invalided Segment Cache for " + databaseName + 
"." + tableName
+    sparkSession.sparkContext.setLocalProperty("spark.job.description", 
jobgroup)
     new InvalidateSegmentCacheRDD(sparkSession, databaseName, tableName, 
segmentIds.toList)
       .collect()
   }
 
   override def showCache(tableName: String = ""): Array[String] = doAs {
+    val jobgroup: String = "Show Cache for " + tableName
+    sparkSession.sparkContext.setLocalProperty("spark.job.description", 
jobgroup)
     new DistributedShowCacheRDD(sparkSession, tableName).collect()
   }
 

Reply via email to