This is an automated email from the ASF dual-hosted git repository.
akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new bbeb974 [CARBONDATA-3487] wrong Input metrics (size/record) displayed
in spark UI during insert into
bbeb974 is described below
commit bbeb974e22d1b87670926d83dcac30c5ef139856
Author: ajantha-bhat <[email protected]>
AuthorDate: Thu Aug 1 17:16:44 2019 +0530
[CARBONDATA-3487] wrong Input metrics (size/record) displayed in spark UI
during insert into
Problem: wrong Input metrics (size/record) displayed in spark UI during
insert into
Cause:
a) size metrics was wrong as the update didn't happen to spark input
metrics because of below problem
NewDataFrameRDD was setting thread local id, which was override by ScanRDD
thread local id
due to multi level RDD.
b) Record metrics was not proper because, during insert into scanRDD will
be called by multiple threads.
Hence synchronisation was not there as one spark input metric (task level)
is used by all the concurrent scanRDD.
Solution:
a) To fix the size, set threadlocal for parent RDD and don't set
threadlocal for child RDD If it is already registered.
b) To fix record, add synchronisation for record metrics (decrease the
frequency by checking after interval)
This closes #3345
---
.../core/constants/CarbonCommonConstants.java | 10 ++++++++
.../carbondata/core/util/CarbonProperties.java | 24 +++++++++++++++++
.../carbondata/core/util/TaskMetricsMap.java | 21 ++++++++++++++-
.../apache/carbondata/spark/InitInputMetrics.java | 2 +-
.../spark/load/DataLoadProcessBuilderOnSpark.scala | 2 +-
.../apache/carbondata/spark/rdd/CarbonRDD.scala | 4 ++-
.../carbondata/spark/rdd/CarbonScanRDD.scala | 2 +-
.../org/apache/spark/CarbonInputMetrics.scala | 30 ++++++++++++++++------
.../datamap/IndexDataMapRebuildRDD.scala | 2 +-
9 files changed, 83 insertions(+), 14 deletions(-)
diff --git
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 510bcee..17b191d 100644
---
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1307,6 +1307,16 @@ public final class CarbonCommonConstants {
public static final String IS_DRIVER_INSTANCE_DEFAULT = "false";
/**
+ * property to set input metrics update interval (in records count), after
every interval,
+ * input metrics will be updated to spark, else will be update in the end of
query
+ */
+ @CarbonProperty(dynamicConfigurable = true)
+ public static final String INPUT_METRICS_UPDATE_INTERVAL =
"carbon.input.metrics.update.interval";
+
+ public static final Long INPUT_METRICS_UPDATE_INTERVAL_DEFAULT = 500000L;
+
+
+ /**
* property for enabling unsafe based query processing
*/
@CarbonProperty(dynamicConfigurable = true)
diff --git
a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 5868664..c60dad8 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -1749,4 +1749,28 @@ public final class CarbonProperties {
}
return numOfThreadsForPruning;
}
+
+ /**
+ * Validate and get the input metrics interval
+ *
+ * @return input metrics interval
+ */
+ public static Long getInputMetricsInterval() {
+ String metrics = CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.INPUT_METRICS_UPDATE_INTERVAL);
+ if (metrics == null) {
+ return CarbonCommonConstants.INPUT_METRICS_UPDATE_INTERVAL_DEFAULT;
+ } else {
+ try {
+ long configuredValue = Long.parseLong(metrics);
+ if (configuredValue < 0) {
+ return CarbonCommonConstants.INPUT_METRICS_UPDATE_INTERVAL_DEFAULT;
+ } else {
+ return configuredValue;
+ }
+ } catch (Exception ex) {
+ return CarbonCommonConstants.INPUT_METRICS_UPDATE_INTERVAL_DEFAULT;
+ }
+ }
+ }
}
diff --git
a/core/src/main/java/org/apache/carbondata/core/util/TaskMetricsMap.java
b/core/src/main/java/org/apache/carbondata/core/util/TaskMetricsMap.java
index 196fd64..9d4e11c 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/TaskMetricsMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/TaskMetricsMap.java
@@ -36,7 +36,7 @@ public class TaskMetricsMap {
private static final Logger LOGGER =
LogServiceFactory.getLogService(TaskMetricsMap.class.getName());
- public static final InheritableThreadLocal<Long> threadLocal = new
InheritableThreadLocal<>();
+ private static final InheritableThreadLocal<Long> threadLocal = new
InheritableThreadLocal<>();
/**
* In this map we are maintaining all spawned child threads callback info
for each parent thread
* here key = parent thread id & values = list of spawned child threads
callbacks
@@ -50,6 +50,25 @@ public class TaskMetricsMap {
return taskMetricsMap;
}
+ public static InheritableThreadLocal<Long> getThreadLocal() {
+ return threadLocal;
+ }
+
+
+ /**
+ * initializes thread local to current thread id
+ *
+ * @return
+ */
+ public static void initializeThreadLocal() {
+ // In case of multi level RDD (say insert into scenario, where
DataFrameRDD calling ScanRDD)
+ // parent thread id should not be overwritten by child thread id.
+ // so don't set if it is already set.
+ if (threadLocal.get() == null) {
+ threadLocal.set(Thread.currentThread().getId());
+ }
+ }
+
/**
* registers current thread callback using parent thread id
*
diff --git
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/InitInputMetrics.java
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/InitInputMetrics.java
index 8574a3a..6e4ab40 100644
---
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/InitInputMetrics.java
+++
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/InitInputMetrics.java
@@ -28,5 +28,5 @@ import org.apache.spark.TaskContext;
*/
public interface InitInputMetrics extends InputMetricsStats {
- void initBytesReadCallback(TaskContext context, CarbonMultiBlockSplit
carbonMultiBlockSplit);
+ void initBytesReadCallback(TaskContext context, CarbonMultiBlockSplit
carbonMultiBlockSplit, Long inputMetricsInterval);
}
diff --git
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index 81699b4..ea83827 100644
---
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -364,7 +364,7 @@ object DataLoadProcessBuilderOnSpark {
TaskContext.get.addTaskCompletionListener { _ =>
CommonUtil.clearUnsafeMemory(ThreadLocalTaskInfo.getCarbonTaskInfo.getTaskId)
}
- TaskMetricsMap.threadLocal.set(Thread.currentThread().getId)
+ TaskMetricsMap.initializeThreadLocal()
val carbonTaskInfo = new CarbonTaskInfo
carbonTaskInfo.setTaskId(CarbonUtil.generateUUID())
ThreadLocalTaskInfo.setCarbonTaskInfo(carbonTaskInfo)
diff --git
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
index ce08f8f..bf3f862 100644
---
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
+++
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
@@ -51,6 +51,8 @@ abstract class CarbonRDD[T: ClassTag](
info
}
+ val inputMetricsInterval: Long = CarbonProperties.getInputMetricsInterval
+
@transient val hadoopConf = SparkSQLUtil.sessionState(ss).newHadoopConf()
val config = SparkSQLUtil.broadCastHadoopConf(sparkContext, hadoopConf)
@@ -73,7 +75,7 @@ abstract class CarbonRDD[T: ClassTag](
TaskContext.get.addTaskCompletionListener(_ =>
ThreadLocalSessionInfo.unsetAll())
carbonSessionInfo.getNonSerializableExtraInfo.put("carbonConf", getConf)
ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
- TaskMetricsMap.threadLocal.set(Thread.currentThread().getId)
+ TaskMetricsMap.initializeThreadLocal()
val carbonTaskInfo = new CarbonTaskInfo
carbonTaskInfo.setTaskId(CarbonUtil.generateUUID())
ThreadLocalTaskInfo.setCarbonTaskInfo(carbonTaskInfo)
diff --git
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 973baa6..73aba46 100644
---
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -422,7 +422,7 @@ class CarbonScanRDD[T: ClassTag](
val format = prepareInputFormatForExecutor(attemptContext.getConfiguration)
val inputSplit = split.asInstanceOf[CarbonSparkPartition].split.value
TaskMetricsMap.getInstance().registerThreadCallback()
- inputMetricsStats.initBytesReadCallback(context, inputSplit)
+ inputMetricsStats.initBytesReadCallback(context, inputSplit,
inputMetricsInterval)
val iterator = if (inputSplit.getAllSplits.size() > 0) {
val model = format.createQueryModel(inputSplit, attemptContext,
filterExpression)
// one query id per table
diff --git
a/integration/spark-common/src/main/scala/org/apache/spark/CarbonInputMetrics.scala
b/integration/spark-common/src/main/scala/org/apache/spark/CarbonInputMetrics.scala
index 41fc013..69a9bfd 100644
---
a/integration/spark-common/src/main/scala/org/apache/spark/CarbonInputMetrics.scala
+++
b/integration/spark-common/src/main/scala/org/apache/spark/CarbonInputMetrics.scala
@@ -18,10 +18,10 @@ package org.apache.spark
import java.lang.Long
-import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.executor.InputMetrics
import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.TaskMetricsMap
import org.apache.carbondata.hadoop.CarbonMultiBlockSplit
import org.apache.carbondata.spark.InitInputMetrics
@@ -35,20 +35,28 @@ class CarbonInputMetrics extends InitInputMetrics{
var inputMetrics: InputMetrics = _
// bytes read before compute by other map rdds in lineage
var existingBytesRead: Long = _
+ var recordCount: Long = _
+ var inputMetricsInterval: Long = _
var carbonMultiBlockSplit: CarbonMultiBlockSplit = _
def initBytesReadCallback(context: TaskContext,
- carbonMultiBlockSplit: CarbonMultiBlockSplit) {
+ carbonMultiBlockSplit: CarbonMultiBlockSplit, inputMetricsInterval:
Long) {
inputMetrics = context.taskMetrics().inputMetrics
existingBytesRead = inputMetrics.bytesRead
- this.carbonMultiBlockSplit = carbonMultiBlockSplit;
+ recordCount = 0L
+ this.inputMetricsInterval = inputMetricsInterval
+ this.carbonMultiBlockSplit = carbonMultiBlockSplit
}
def incrementRecordRead(recordRead: Long) {
- val value : scala.Long = recordRead
- inputMetrics.incRecordsRead(value)
- if (inputMetrics.recordsRead %
SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) {
- updateBytesRead()
+ val value: scala.Long = recordRead
+ recordCount = recordCount + value
+ if (recordCount > inputMetricsInterval) {
+ inputMetrics.synchronized {
+ inputMetrics.incRecordsRead(recordCount)
+ updateBytesRead()
+ }
+ recordCount = 0L
}
}
@@ -59,10 +67,16 @@ class CarbonInputMetrics extends InitInputMetrics{
}
def updateAndClose() {
+ if (recordCount > 0L) {
+ inputMetrics.synchronized {
+ inputMetrics.incRecordsRead(recordCount)
+ }
+ recordCount = 0L
+ }
// if metrics supported file system ex: hdfs
if
(!TaskMetricsMap.getInstance().isCallbackEmpty(Thread.currentThread().getId)) {
updateBytesRead()
- // after update clear parent thread entry from map.
+ // after update clear parent thread entry from map.
TaskMetricsMap.getInstance().removeEntry(Thread.currentThread().getId)
} else if (carbonMultiBlockSplit.isInstanceOf[CarbonMultiBlockSplit]) {
// If we can't get the bytes read from the FS stats, fall back to the
split size,
diff --git
a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
index 31d1390..383100f 100644
---
a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
+++
b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
@@ -338,7 +338,7 @@ class IndexDataMapRebuildRDD[K, V](
val segmentId = inputSplit.getAllSplits.get(0).getSegment.getSegmentNo
val segment = segments.find(p => p.getSegmentNo.equals(segmentId))
if (segment.isDefined) {
- inputMetrics.initBytesReadCallback(context, inputSplit)
+ inputMetrics.initBytesReadCallback(context, inputSplit,
inputMetricsInterval)
val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP,
split.index, 0)
val attemptContext = new
TaskAttemptContextImpl(FileFactory.getConfiguration, attemptId)