This is an automated email from the ASF dual-hosted git repository.
kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new c8c0211 [CARBONDATA-3378]Display original query in Indexserver Job
c8c0211 is described below
commit c8c0211c33371bf5cb5b2b912553076361097cc0
Author: BJangir <[email protected]>
AuthorDate: Tue May 7 19:06:36 2019 +0530
[CARBONDATA-3378]Display original query in Indexserver Job
When any query fired in main jdbcserver , in Index server
there is no mapping of it.
It is difficult to find which job in index server belong to which
query specially in concurrent queries.
This PR will display query in index server also along with Executionid .
This closes #3208
---
.../core/constants/CarbonCommonConstants.java | 5 +++
.../core/datamap/DistributableDataMapFormat.java | 48 ++++++++++++++++++++++
.../carbondata/indexserver/DataMapJobs.scala | 23 +++++++++++
.../carbondata/indexserver/IndexServer.scala | 6 +++
4 files changed, 82 insertions(+)
diff --git
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 311019c..8b39343 100644
---
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -2184,4 +2184,9 @@ public final class CarbonCommonConstants {
public static final String LOAD_SYNC_TIME = "load_sync_time";
+ public static final String CARBON_INDEX_SERVER_JOBNAME_LENGTH =
+ "carbon.index.server.max.jobname.length";
+
+ public static final String CARBON_INDEX_SERVER_JOBNAME_LENGTH_DEFAULT =
+ "50";
}
diff --git
a/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java
b/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java
index 57540e4..0478b40 100644
---
a/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java
+++
b/core/src/main/java/org/apache/carbondata/core/datamap/DistributableDataMapFormat.java
@@ -26,6 +26,7 @@ import java.util.Iterator;
import java.util.List;
import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datamap.dev.DataMap;
import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper;
import
org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder;
@@ -36,6 +37,8 @@ import
org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+
+import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.ObjectSerializationUtil;
import org.apache.commons.lang.StringUtils;
@@ -78,6 +81,11 @@ public class DistributableDataMapFormat extends
FileInputFormat<Void, ExtendedBl
private ReadCommittedScope readCommittedScope;
+ private String taskGroupId = "";
+
+ private String taskGroupDesc = "";
+
+
DistributableDataMapFormat() {
}
@@ -270,6 +278,8 @@ public class DistributableDataMapFormat extends
FileInputFormat<Void, ExtendedBl
out.writeBoolean(false);
}
out.writeUTF(dataMapToClear);
+ out.writeUTF(taskGroupId);
+ out.writeUTF(taskGroupDesc);
}
@Override
@@ -311,6 +321,8 @@ public class DistributableDataMapFormat extends
FileInputFormat<Void, ExtendedBl
.convertStringToObject(new String(filterResolverBytes,
Charset.defaultCharset()));
}
this.dataMapToClear = in.readUTF();
+ this.taskGroupId = in.readUTF();
+ this.taskGroupDesc = in.readUTF();
}
private void initReadCommittedScope() throws IOException {
@@ -335,6 +347,42 @@ public class DistributableDataMapFormat extends
FileInputFormat<Void, ExtendedBl
return isJobToClearDataMaps;
}
+ public String getTaskGroupId() {
+ return taskGroupId;
+ }
+
+ /* setTaskGroupId will be used for Index server to display ExecutionId*/
+ public void setTaskGroupId(String taskGroupId) {
+ this.taskGroupId = taskGroupId;
+ }
+
+ public String getTaskGroupDesc() {
+ return taskGroupDesc;
+ }
+
+ /* setTaskGroupId will be used for Index server to display Query
+ * If Job name is >CARBON_INDEX_SERVER_JOBNAME_LENGTH
+ * then need to cut as transferring big query to IndexServer will be costly.
+ */
+ public void setTaskGroupDesc(String taskGroupDesc) {
+ int maxJobLenth;
+ try {
+ String maxJobLenthString = CarbonProperties.getInstance()
+
.getProperty(CarbonCommonConstants.CARBON_INDEX_SERVER_JOBNAME_LENGTH ,
+
CarbonCommonConstants.CARBON_INDEX_SERVER_JOBNAME_LENGTH_DEFAULT);
+ maxJobLenth = Integer.parseInt(maxJobLenthString);
+ } catch (Exception e) {
+ String maxJobLenthString = CarbonProperties.getInstance()
+
.getProperty(CarbonCommonConstants.CARBON_INDEX_SERVER_JOBNAME_LENGTH_DEFAULT);
+ maxJobLenth = Integer.parseInt(maxJobLenthString);
+ }
+ if (taskGroupDesc.length() > maxJobLenth) {
+ this.taskGroupDesc = taskGroupDesc.substring(0, maxJobLenth);
+ } else {
+ this.taskGroupDesc = taskGroupDesc;
+ }
+ }
+
public FilterResolverIntf getFilterResolverIntf() {
return filterResolverIntf;
}
diff --git
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala
index 0ee4ebb..698dd58 100644
---
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala
+++
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala
@@ -21,6 +21,7 @@ import java.util
import scala.collection.JavaConverters._
import org.apache.log4j.Logger
+import org.apache.spark.sql.util.SparkSQLUtil
import org.apache.spark.util.SizeEstimator
import org.apache.carbondata.common.logging.LogServiceFactory
@@ -47,6 +48,17 @@ class DistributedDataMapJob extends AbstractDataMapJob {
LOGGER.debug(s"Size of message sent to Index Server: $messageSize")
}
val (resonse, time) = logTime {
+ val spark = SparkSQLUtil.getSparkSession
+ val taskGroupId =
spark.sparkContext.getLocalProperty("spark.jobGroup.id") match {
+ case null => ""
+ case _ => spark.sparkContext.getLocalProperty("spark.jobGroup.id")
+ }
+ val taskGroupDesc =
spark.sparkContext.getLocalProperty("spark.job.description") match {
+ case null => ""
+ case _ => spark.sparkContext.getLocalProperty("spark.job.description")
+ }
+ dataMapFormat.setTaskGroupId(taskGroupId)
+ dataMapFormat.setTaskGroupDesc(taskGroupDesc)
var filterInf = dataMapFormat.getFilterResolverIntf
val filterProcessor = new FilterExpressionProcessor
filterInf = removeSparkUnknown(filterInf,
@@ -95,6 +107,17 @@ class DistributedDataMapJob extends AbstractDataMapJob {
class EmbeddedDataMapJob extends AbstractDataMapJob {
override def execute(dataMapFormat: DistributableDataMapFormat):
util.List[ExtendedBlocklet] = {
+ val spark = SparkSQLUtil.getSparkSession
+ val taskGroupId = spark.sparkContext.getLocalProperty("spark.jobGroup.id")
match {
+ case null => ""
+ case _ => spark.sparkContext.getLocalProperty("spark.jobGroup.id")
+ }
+ val taskGroupDesc =
spark.sparkContext.getLocalProperty("spark.job.description") match {
+ case null => ""
+ case _ => spark.sparkContext.getLocalProperty("spark.job.description")
+ }
+ dataMapFormat.setTaskGroupId(taskGroupId)
+ dataMapFormat.setTaskGroupDesc(taskGroupDesc)
IndexServer.getSplits(dataMapFormat).toList.asJava
}
diff --git
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
index f066095..9eee6d7 100644
---
a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
+++
b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
@@ -100,6 +100,8 @@ object IndexServer extends ServerInterface {
}
def getSplits(request: DistributableDataMapFormat): Array[ExtendedBlocklet]
= doAs {
+ sparkSession.sparkContext.setLocalProperty("spark.jobGroup.id",
request.getTaskGroupId)
+ sparkSession.sparkContext.setLocalProperty("spark.job.description",
request.getTaskGroupDesc)
val splits = new DistributedPruneRDD(sparkSession, request).collect()
DistributedRDDUtils.updateExecutorCacheSize(splits.map(_._1).toSet)
if (request.isJobToClearDataMaps) {
@@ -110,11 +112,15 @@ object IndexServer extends ServerInterface {
override def invalidateSegmentCache(databaseName: String, tableName: String,
segmentIds: Array[String]): Unit = doAs {
+ val jobgroup: String = " Invalided Segment Cache for " + databaseName +
"." + tableName
+ sparkSession.sparkContext.setLocalProperty("spark.job.description",
jobgroup)
new InvalidateSegmentCacheRDD(sparkSession, databaseName, tableName,
segmentIds.toList)
.collect()
}
override def showCache(tableName: String = ""): Array[String] = doAs {
+ val jobgroup: String = "Show Cache for " + tableName
+ sparkSession.sparkContext.setLocalProperty("spark.job.description",
jobgroup)
new DistributedShowCacheRDD(sparkSession, tableName).collect()
}