This is an automated email from the ASF dual-hosted git repository.

akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new bbeb974  [CARBONDATA-3487] wrong Input metrics (size/record) displayed 
in spark UI during insert into
bbeb974 is described below

commit bbeb974e22d1b87670926d83dcac30c5ef139856
Author: ajantha-bhat <[email protected]>
AuthorDate: Thu Aug 1 17:16:44 2019 +0530

    [CARBONDATA-3487] wrong Input metrics (size/record) displayed in spark UI 
during insert into
    
    Problem: wrong Input metrics (size/record) displayed in spark UI during 
insert into
    
    Cause:
    a) size metrics was wrong as the update didn't happen to spark input 
metrics because of below problem
    NewDataFrameRDD was setting thread local id, which was override by ScanRDD 
thread local id
    due to multi level RDD.
    b) Record metrics was not proper because, during insert into scanRDD will 
be called by multiple threads.
    Hence synchronisation was not there as one spark input metric (task level) 
is used by all the concurrent scanRDD.
    
    Solution:
    a) To fix the size, set threadlocal for parent RDD and don't set 
threadlocal for child RDD If it is already registered.
    b) To fix record, add synchronisation for record metrics (decrease the 
frequency by checking after interval)
    
    This closes #3345
---
 .../core/constants/CarbonCommonConstants.java      | 10 ++++++++
 .../carbondata/core/util/CarbonProperties.java     | 24 +++++++++++++++++
 .../carbondata/core/util/TaskMetricsMap.java       | 21 ++++++++++++++-
 .../apache/carbondata/spark/InitInputMetrics.java  |  2 +-
 .../spark/load/DataLoadProcessBuilderOnSpark.scala |  2 +-
 .../apache/carbondata/spark/rdd/CarbonRDD.scala    |  4 ++-
 .../carbondata/spark/rdd/CarbonScanRDD.scala       |  2 +-
 .../org/apache/spark/CarbonInputMetrics.scala      | 30 ++++++++++++++++------
 .../datamap/IndexDataMapRebuildRDD.scala           |  2 +-
 9 files changed, 83 insertions(+), 14 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 510bcee..17b191d 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1307,6 +1307,16 @@ public final class CarbonCommonConstants {
   public static final String IS_DRIVER_INSTANCE_DEFAULT = "false";
 
   /**
+   * property to set input metrics update interval (in records count), after 
every interval,
+   * input metrics will be updated to spark, else will be update in the end of 
query
+   */
+  @CarbonProperty(dynamicConfigurable = true)
+  public static final String INPUT_METRICS_UPDATE_INTERVAL = 
"carbon.input.metrics.update.interval";
+
+  public static final Long INPUT_METRICS_UPDATE_INTERVAL_DEFAULT = 500000L;
+
+
+  /**
    * property for enabling unsafe based query processing
    */
   @CarbonProperty(dynamicConfigurable = true)
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java 
b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index 5868664..c60dad8 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -1749,4 +1749,28 @@ public final class CarbonProperties {
     }
     return numOfThreadsForPruning;
   }
+
+  /**
+   * Validate and get the input metrics interval
+   *
+   * @return input metrics interval
+   */
+  public static Long getInputMetricsInterval() {
+    String metrics = CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.INPUT_METRICS_UPDATE_INTERVAL);
+    if (metrics == null) {
+      return CarbonCommonConstants.INPUT_METRICS_UPDATE_INTERVAL_DEFAULT;
+    } else {
+      try {
+        long configuredValue = Long.parseLong(metrics);
+        if (configuredValue < 0) {
+          return CarbonCommonConstants.INPUT_METRICS_UPDATE_INTERVAL_DEFAULT;
+        } else {
+          return configuredValue;
+        }
+      } catch (Exception ex) {
+        return CarbonCommonConstants.INPUT_METRICS_UPDATE_INTERVAL_DEFAULT;
+      }
+    }
+  }
 }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/TaskMetricsMap.java 
b/core/src/main/java/org/apache/carbondata/core/util/TaskMetricsMap.java
index 196fd64..9d4e11c 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/TaskMetricsMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/TaskMetricsMap.java
@@ -36,7 +36,7 @@ public class TaskMetricsMap {
   private static final Logger LOGGER =
       LogServiceFactory.getLogService(TaskMetricsMap.class.getName());
 
-  public static final InheritableThreadLocal<Long> threadLocal = new 
InheritableThreadLocal<>();
+  private static final InheritableThreadLocal<Long> threadLocal = new 
InheritableThreadLocal<>();
   /**
    * In this map we are maintaining all spawned child threads callback info 
for each parent thread
    * here key = parent thread id & values =  list of spawned child threads 
callbacks
@@ -50,6 +50,25 @@ public class TaskMetricsMap {
     return taskMetricsMap;
   }
 
+  public static InheritableThreadLocal<Long> getThreadLocal() {
+    return threadLocal;
+  }
+
+
+  /**
+   * initializes thread local to current thread id
+   *
+   * @return
+   */
+  public static void initializeThreadLocal() {
+    // In case of multi level RDD (say insert into scenario, where 
DataFrameRDD calling ScanRDD)
+    // parent thread id should not be overwritten by child thread id.
+    // so don't set if it is already set.
+    if (threadLocal.get() == null) {
+      threadLocal.set(Thread.currentThread().getId());
+    }
+  }
+
   /**
    * registers current thread callback using parent thread id
    *
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/InitInputMetrics.java
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/InitInputMetrics.java
index 8574a3a..6e4ab40 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/InitInputMetrics.java
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/InitInputMetrics.java
@@ -28,5 +28,5 @@ import org.apache.spark.TaskContext;
  */
 public interface InitInputMetrics extends InputMetricsStats {
 
-  void initBytesReadCallback(TaskContext context, CarbonMultiBlockSplit 
carbonMultiBlockSplit);
+  void initBytesReadCallback(TaskContext context, CarbonMultiBlockSplit 
carbonMultiBlockSplit, Long inputMetricsInterval);
 }
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index 81699b4..ea83827 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -364,7 +364,7 @@ object DataLoadProcessBuilderOnSpark {
     TaskContext.get.addTaskCompletionListener { _ =>
       
CommonUtil.clearUnsafeMemory(ThreadLocalTaskInfo.getCarbonTaskInfo.getTaskId)
     }
-    TaskMetricsMap.threadLocal.set(Thread.currentThread().getId)
+    TaskMetricsMap.initializeThreadLocal()
     val carbonTaskInfo = new CarbonTaskInfo
     carbonTaskInfo.setTaskId(CarbonUtil.generateUUID())
     ThreadLocalTaskInfo.setCarbonTaskInfo(carbonTaskInfo)
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
index ce08f8f..bf3f862 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonRDD.scala
@@ -51,6 +51,8 @@ abstract class CarbonRDD[T: ClassTag](
     info
   }
 
+  val inputMetricsInterval: Long = CarbonProperties.getInputMetricsInterval
+
   @transient val hadoopConf = SparkSQLUtil.sessionState(ss).newHadoopConf()
 
   val config = SparkSQLUtil.broadCastHadoopConf(sparkContext, hadoopConf)
@@ -73,7 +75,7 @@ abstract class CarbonRDD[T: ClassTag](
     TaskContext.get.addTaskCompletionListener(_ => 
ThreadLocalSessionInfo.unsetAll())
     carbonSessionInfo.getNonSerializableExtraInfo.put("carbonConf", getConf)
     ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
-    TaskMetricsMap.threadLocal.set(Thread.currentThread().getId)
+    TaskMetricsMap.initializeThreadLocal()
     val carbonTaskInfo = new CarbonTaskInfo
     carbonTaskInfo.setTaskId(CarbonUtil.generateUUID())
     ThreadLocalTaskInfo.setCarbonTaskInfo(carbonTaskInfo)
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 973baa6..73aba46 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -422,7 +422,7 @@ class CarbonScanRDD[T: ClassTag](
     val format = prepareInputFormatForExecutor(attemptContext.getConfiguration)
     val inputSplit = split.asInstanceOf[CarbonSparkPartition].split.value
     TaskMetricsMap.getInstance().registerThreadCallback()
-    inputMetricsStats.initBytesReadCallback(context, inputSplit)
+    inputMetricsStats.initBytesReadCallback(context, inputSplit, 
inputMetricsInterval)
     val iterator = if (inputSplit.getAllSplits.size() > 0) {
       val model = format.createQueryModel(inputSplit, attemptContext, 
filterExpression)
       // one query id per table
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/CarbonInputMetrics.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/CarbonInputMetrics.scala
index 41fc013..69a9bfd 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/CarbonInputMetrics.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/CarbonInputMetrics.scala
@@ -18,10 +18,10 @@ package org.apache.spark
 
 import java.lang.Long
 
-import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.executor.InputMetrics
 
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.TaskMetricsMap
 import org.apache.carbondata.hadoop.CarbonMultiBlockSplit
 import org.apache.carbondata.spark.InitInputMetrics
@@ -35,20 +35,28 @@ class CarbonInputMetrics extends InitInputMetrics{
     var inputMetrics: InputMetrics = _
     // bytes read before compute by other map rdds in lineage
     var existingBytesRead: Long = _
+    var recordCount: Long = _
+    var inputMetricsInterval: Long = _
     var carbonMultiBlockSplit: CarbonMultiBlockSplit = _
 
   def initBytesReadCallback(context: TaskContext,
-      carbonMultiBlockSplit: CarbonMultiBlockSplit) {
+      carbonMultiBlockSplit: CarbonMultiBlockSplit, inputMetricsInterval: 
Long) {
     inputMetrics = context.taskMetrics().inputMetrics
     existingBytesRead = inputMetrics.bytesRead
-    this.carbonMultiBlockSplit = carbonMultiBlockSplit;
+    recordCount = 0L
+    this.inputMetricsInterval = inputMetricsInterval
+    this.carbonMultiBlockSplit = carbonMultiBlockSplit
   }
 
   def incrementRecordRead(recordRead: Long) {
-    val value : scala.Long = recordRead
-    inputMetrics.incRecordsRead(value)
-    if (inputMetrics.recordsRead % 
SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) {
-      updateBytesRead()
+    val value: scala.Long = recordRead
+    recordCount = recordCount + value
+    if (recordCount > inputMetricsInterval) {
+      inputMetrics.synchronized {
+        inputMetrics.incRecordsRead(recordCount)
+        updateBytesRead()
+      }
+      recordCount = 0L
     }
   }
 
@@ -59,10 +67,16 @@ class CarbonInputMetrics extends InitInputMetrics{
   }
 
   def updateAndClose() {
+    if (recordCount > 0L) {
+      inputMetrics.synchronized {
+        inputMetrics.incRecordsRead(recordCount)
+      }
+      recordCount = 0L
+    }
     // if metrics supported file system ex: hdfs
     if 
(!TaskMetricsMap.getInstance().isCallbackEmpty(Thread.currentThread().getId)) {
       updateBytesRead()
-     // after update clear parent thread entry from map.
+      // after update clear parent thread entry from map.
       TaskMetricsMap.getInstance().removeEntry(Thread.currentThread().getId)
     } else if (carbonMultiBlockSplit.isInstanceOf[CarbonMultiBlockSplit]) {
       // If we can't get the bytes read from the FS stats, fall back to the 
split size,
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
index 31d1390..383100f 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
@@ -338,7 +338,7 @@ class IndexDataMapRebuildRDD[K, V](
     val segmentId = inputSplit.getAllSplits.get(0).getSegment.getSegmentNo
     val segment = segments.find(p => p.getSegmentNo.equals(segmentId))
     if (segment.isDefined) {
-      inputMetrics.initBytesReadCallback(context, inputSplit)
+      inputMetrics.initBytesReadCallback(context, inputSplit, 
inputMetricsInterval)
 
       val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, 
split.index, 0)
       val attemptContext = new 
TaskAttemptContextImpl(FileFactory.getConfiguration, attemptId)

Reply via email to