Repository: carbondata
Updated Branches:
  refs/heads/branch-1.3 31c7b505a -> 3c48df396


http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 094b8c3..62d24b7 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -31,6 +31,8 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.DataMapStoreManager;
 import org.apache.carbondata.core.datamap.Segment;
@@ -132,6 +134,12 @@ public class CarbonTableInputFormat<T> extends 
FileInputFormat<Void, T> {
   public static final String UPADTE_T =
       "mapreduce.input.carboninputformat.partitions.to.prune";
 
+  /**
+   * Attribute for Carbon LOGGER.
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CarbonProperties.class.getName());
+
   // a cache for carbon table, it will be used in task side
   private CarbonTable carbonTable;
 
@@ -192,7 +200,7 @@ public class CarbonTableInputFormat<T> extends 
FileInputFormat<Void, T> {
   }
 
 
-  public static void setDataMapJob(Configuration configuration, DataMapJob 
dataMapJob)
+  public static void setDataMapJob(Configuration configuration, Object 
dataMapJob)
       throws IOException {
     if (dataMapJob != null) {
       String toString = 
ObjectSerializationUtil.convertObjectToString(dataMapJob);
@@ -200,7 +208,7 @@ public class CarbonTableInputFormat<T> extends 
FileInputFormat<Void, T> {
     }
   }
 
-  private static DataMapJob getDataMapJob(Configuration configuration) throws 
IOException {
+  public static DataMapJob getDataMapJob(Configuration configuration) throws 
IOException {
     String jobString = configuration.get(DATA_MAP_DSTR);
     if (jobString != null) {
       return (DataMapJob) 
ObjectSerializationUtil.convertStringToObject(jobString);
@@ -760,11 +768,9 @@ public class CarbonTableInputFormat<T> extends 
FileInputFormat<Void, T> {
     List<PartitionSpec> partitionsToPrune = 
getPartitionsToPrune(job.getConfiguration());
     List<ExtendedBlocklet> prunedBlocklets;
     if (dataMapJob != null) {
-      DistributableDataMapFormat datamapDstr =
-          new DistributableDataMapFormat(absoluteTableIdentifier, 
BlockletDataMap.NAME,
-              segmentIds, partitionsToPrune,
-              BlockletDataMapFactory.class.getName());
-      prunedBlocklets = dataMapJob.execute(datamapDstr, resolver);
+      prunedBlocklets =
+          getExtendedBlocklets(job, absoluteTableIdentifier, resolver, 
segmentIds, blockletMap,
+              dataMapJob, partitionsToPrune);
     } else {
       prunedBlocklets = blockletMap.prune(segmentIds, resolver, 
partitionsToPrune);
     }
@@ -809,6 +815,23 @@ public class CarbonTableInputFormat<T> extends 
FileInputFormat<Void, T> {
     return resultFilterredBlocks;
   }
 
+  public List<ExtendedBlocklet> getExtendedBlocklets(JobContext job,
+      AbsoluteTableIdentifier absoluteTableIdentifier, FilterResolverIntf 
resolver,
+      List<Segment> segmentIds, TableDataMap blockletMap, DataMapJob 
dataMapJob,
+      List<PartitionSpec> partitionsToPrune) {
+    List<ExtendedBlocklet> prunedBlocklets = new ArrayList<>();
+    boolean distributedDataMaps = 
Boolean.parseBoolean(CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP,
+            CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT));
+    if (distributedDataMaps) {
+      String className = 
"org.apache.carbondata.hadoop.api.DistributableDataMapFormat";
+      FileInputFormat dataMapFormat =
+          createDataMapJob(absoluteTableIdentifier, segmentIds, 
partitionsToPrune, className);
+      prunedBlocklets = dataMapJob.execute((DistributableDataMapFormat) 
dataMapFormat, resolver);
+    }
+    return prunedBlocklets;
+  }
+
   private CarbonInputSplit convertToCarbonInputSplit(ExtendedBlocklet 
blocklet) throws IOException {
     org.apache.carbondata.hadoop.CarbonInputSplit split =
         
org.apache.carbondata.hadoop.CarbonInputSplit.from(blocklet.getSegmentId(),
@@ -819,6 +842,19 @@ public class CarbonTableInputFormat<T> extends 
FileInputFormat<Void, T> {
     return split;
   }
 
+  public static FileInputFormat createDataMapJob(AbsoluteTableIdentifier 
absoluteTableIdentifier,
+      List<Segment> segments, List<PartitionSpec> partitionsToPrune, String 
clsName) {
+    try {
+      Constructor<?> cons = 
Class.forName(clsName).getDeclaredConstructors()[0];
+      return (FileInputFormat) cons
+          .newInstance(absoluteTableIdentifier, BlockletDataMap.NAME, 
segments, partitionsToPrune,
+              BlockletDataMapFactory.class.getName());
+    } catch (Exception e) {
+      LOGGER.error(e);
+      return null;
+    }
+  }
+
   @Override public RecordReader<Void, T> createRecordReader(InputSplit 
inputSplit,
       TaskAttemptContext taskAttemptContext) throws IOException, 
InterruptedException {
     Configuration configuration = taskAttemptContext.getConfiguration();
@@ -953,7 +989,16 @@ public class CarbonTableInputFormat<T> extends 
FileInputFormat<Void, T> {
     // TODO: currently only batch segment is supported, add support for 
streaming table
     List<Segment> filteredSegment = getFilteredSegment(job, 
allSegments.getValidSegments(), false);
 
-    List<ExtendedBlocklet> blocklets = blockletMap.prune(filteredSegment, 
null, partitions);
+    DataMapJob dataMapJob = getDataMapJob(job.getConfiguration());
+    List<ExtendedBlocklet> blocklets;
+    if (dataMapJob != null) {
+      blocklets =
+          getExtendedBlocklets(job, identifier, null, filteredSegment, 
blockletMap, dataMapJob,
+              partitions);
+    } else {
+      blocklets = blockletMap.prune(filteredSegment, null, partitions);
+    }
+
     for (ExtendedBlocklet blocklet : blocklets) {
       String blockName = blocklet.getPath();
       blockName = CarbonTablePath.getCarbonDataFileName(blockName);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DataMapJob.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DataMapJob.java 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DataMapJob.java
index fad2336..3e32ecb 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DataMapJob.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DataMapJob.java
@@ -19,15 +19,21 @@ package org.apache.carbondata.hadoop.api;
 import java.io.Serializable;
 import java.util.List;
 
+import org.apache.carbondata.core.datamap.dev.DataMap;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
 /**
  * Distributable datamap job to execute the #DistributableDataMapFormat in 
cluster. it prunes the
  * datamaps distributably and returns the final blocklet list
  */
 public interface DataMapJob extends Serializable {
 
+  List<DataMap> execute(CarbonTable carbonTable, FileInputFormat<Void, 
DataMap> dataMapFormat);
+
   List<ExtendedBlocklet> execute(DistributableDataMapFormat dataMapFormat,
       FilterResolverIntf resolverIntf);
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
index 514428b..56951ac 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
@@ -22,6 +22,9 @@ import java.text.SimpleDateFormat;
 import java.util.List;
 import java.util.Locale;
 
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
@@ -37,6 +40,7 @@ import org.apache.carbondata.core.scan.model.CarbonQueryPlan;
 import org.apache.carbondata.core.scan.model.QueryDimension;
 import org.apache.carbondata.core.scan.model.QueryMeasure;
 import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
 
 import org.apache.hadoop.fs.Path;
@@ -49,6 +53,12 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  */
 public class CarbonInputFormatUtil {
 
+  /**
+   * Attribute for Carbon LOGGER.
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CarbonProperties.class.getName());
+
   public static CarbonQueryPlan createQueryPlan(CarbonTable carbonTable, 
String columnString) {
     String[] columns = null;
     if (columnString != null) {
@@ -89,6 +99,7 @@ public class CarbonInputFormatUtil {
     carbonInputFormat
         .setTableName(job.getConfiguration(), 
identifier.getCarbonTableIdentifier().getTableName());
     FileInputFormat.addInputPath(job, new Path(identifier.getTablePath()));
+    setDataMapJobIfConfigured(job, carbonInputFormat);
     return carbonInputFormat;
   }
 
@@ -101,9 +112,42 @@ public class CarbonInputFormatUtil {
     carbonTableInputFormat
         .setTableName(job.getConfiguration(), 
identifier.getCarbonTableIdentifier().getTableName());
     FileInputFormat.addInputPath(job, new Path(identifier.getTablePath()));
+    setDataMapJobIfConfigured(job, carbonTableInputFormat);
     return carbonTableInputFormat;
   }
 
+  /**
+   * This method set DataMapJob if configured
+   *
+   * @param job
+   * @throws IOException
+   */
+  private static void setDataMapJobIfConfigured(Job job,
+      CarbonTableInputFormat carbonTableInputFormat) throws IOException {
+    boolean distributedDataMaps = 
Boolean.parseBoolean(CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP,
+            CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT));
+    if (distributedDataMaps) {
+      String className = "org.apache.carbondata.spark.rdd.SparkDataMapJob";
+      CarbonTableInputFormat.setDataMapJob(job.getConfiguration(), 
createDataMapJob(className));
+    }
+  }
+
+  /**
+   * Creates instance for the DataMap Job class
+   *
+   * @param className
+   * @return
+   */
+  public static Object createDataMapJob(String className) {
+    try {
+      return 
Class.forName(className).getDeclaredConstructors()[0].newInstance();
+    } catch (Exception e) {
+      LOGGER.error(e);
+      return null;
+    }
+  }
+
   private static void addQueryMeasure(CarbonQueryPlan plan, int order, 
CarbonMeasure measure) {
     QueryMeasure queryMeasure = new QueryMeasure(measure.getColName());
     queryMeasure.setQueryOrder(order);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index a62f60a..174e270 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -52,6 +52,7 @@ import org.apache.carbondata.core.util._
 import org.apache.carbondata.hadoop._
 import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
 import org.apache.carbondata.hadoop.streaming.{CarbonStreamInputFormat, 
CarbonStreamRecordReader}
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
 import org.apache.carbondata.spark.InitInputMetrics
 import org.apache.carbondata.spark.util.{SparkDataTypeConverterImpl, Util}
@@ -452,10 +453,12 @@ class CarbonScanRDD(
     CarbonTableInputFormat.setQuerySegment(conf, identifier)
     CarbonTableInputFormat.setFilterPredicates(conf, filterExpression)
     CarbonTableInputFormat.setColumnProjection(conf, columnProjection)
-    if (CarbonProperties.getInstance()
+    val distributedDataMaps = CarbonProperties.getInstance()
       .getProperty(CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP,
-        CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT).toBoolean) {
-      CarbonTableInputFormat.setDataMapJob(conf, new SparkDataMapJob)
+        CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT).toBoolean
+    if (distributedDataMaps) {
+      val className = "org.apache.carbondata.spark.rdd.SparkDataMapJob"
+      CarbonTableInputFormat.setDataMapJob(conf, 
CarbonInputFormatUtil.createDataMapJob(className))
     }
 
     // when validate segments is disabled in thread local update it to 
CarbonTableInputFormat

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
index 600cd80..d9b9fd6 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
@@ -29,12 +29,12 @@ import org.apache.spark.{Partition, SparkContext, 
TaskContext, TaskKilledExcepti
 
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf
-import org.apache.carbondata.hadoop.api.{DataMapJob, 
DistributableDataMapFormat}
+import org.apache.carbondata.hadoop.api.{AbstractDataMapJob, 
DistributableDataMapFormat}
 
 /**
  * Spark job to execute datamap job and prune all the datamaps distributable
  */
-class SparkDataMapJob extends DataMapJob {
+class SparkDataMapJob extends AbstractDataMapJob {
 
   override def execute(dataMapFormat: DistributableDataMapFormat,
       resolverIntf: FilterResolverIntf): util.List[ExtendedBlocklet] = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
index c286c50..bda45dd 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
@@ -31,10 +31,13 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.execution.LeafExecNode
 import org.apache.spark.sql.optimizer.CarbonFilters
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
 
 case class CarbonCountStar(
     attributesRaw: Seq[Attribute],
@@ -74,8 +77,18 @@ case class CarbonCountStar(
     val carbonInputFormat = new CarbonTableInputFormat[Array[Object]]()
     val jobConf: JobConf = new JobConf(new Configuration)
     SparkHadoopUtil.get.addCredentials(jobConf)
+    CarbonTableInputFormat.setDatabaseName(jobConf, 
absoluteTableIdentifier.getDatabaseName)
+    CarbonTableInputFormat.setTableName(jobConf, 
absoluteTableIdentifier.getTableName)
     val job = new Job(jobConf)
     FileInputFormat.addInputPath(job, new 
Path(absoluteTableIdentifier.getTablePath))
+    val distributedDataMaps = CarbonProperties.getInstance()
+      .getProperty(CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP,
+        CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT).toBoolean
+    if (distributedDataMaps) {
+      val className = "org.apache.carbondata.spark.rdd.SparkDataMapJob"
+      CarbonTableInputFormat
+        .setDataMapJob(job.getConfiguration, 
CarbonInputFormatUtil.createDataMapJob(className))
+    }
     (job, carbonInputFormat)
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c48df39/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
index 7ca34af..1694e1c 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
@@ -95,6 +95,15 @@ object CarbonSetCommand {
           "property should be in \" carbon.input.segments.<database_name>" +
           ".<table_name>=<seg_id list> \" format.")
       }
+    } else if 
(key.startsWith(CarbonCommonConstants.CARBON_LOAD_DATAMAPS_PARALLEL)) {
+      if (key.split("\\.").length == 6) {
+        sessionParams.addProperty(key.toLowerCase(), value)
+      }
+      else {
+        throw new MalformedCarbonCommandException(
+          "property should be in \" 
carbon.load.datamaps.parallel.<database_name>" +
+          ".<table_name>=<true/false> \" format.")
+      }
     } else if 
(key.startsWith(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS)) {
       sessionParams.addProperty(key.toLowerCase(), value)
     }

Reply via email to