[CARBONDATA-2196] Take CarbonTable from loadmodel during streaming ingestion

This closes #1991


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/65234b27
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/65234b27
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/65234b27

Branch: refs/heads/branch-1.3
Commit: 65234b27df8abd16fa3e7e21c0bf72fa5440aa4e
Parents: 6d3105b
Author: rahulforallp <rahul.ku...@knoldus.in>
Authored: Thu Feb 22 18:29:57 2018 +0530
Committer: ravipesala <ravi.pes...@gmail.com>
Committed: Sat Mar 3 18:05:04 2018 +0530

----------------------------------------------------------------------
 .../carbondata/core/datamap/TableDataMap.java   |  6 ++--
 .../hadoop/api/CarbonTableInputFormat.java      |  2 +-
 .../merger/CompactionResultSortProcessor.java   |  6 ++--
 .../util/CarbonDataProcessorUtil.java           | 36 ++++++++++++++++----
 4 files changed, 35 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/65234b27/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java 
b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
index 6555d6c..020d6c9 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
@@ -176,16 +176,16 @@ public final class TableDataMap extends 
OperationEventListener {
    * @return
    * @throws IOException
    */
-  public List<String> pruneSegments(List<Segment> segments, FilterResolverIntf 
filterExp)
+  public List<Segment> pruneSegments(List<Segment> segments, 
FilterResolverIntf filterExp)
       throws IOException {
-    List<String> prunedSegments = new 
ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    List<Segment> prunedSegments = new 
ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
     for (Segment segment : segments) {
       List<DataMap> dataMaps = dataMapFactory.getDataMaps(segment);
       for (DataMap dataMap : dataMaps) {
         if (dataMap.isScanRequired(filterExp)) {
           // If any one task in a given segment contains the data that means 
the segment need to
           // be scanned and we need to validate further data maps in the same 
segment
-          prunedSegments.add(segment.getSegmentNo());
+          prunedSegments.add(segment);
           break;
         }
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/65234b27/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 96b0b21..3dbf04f 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -910,7 +910,7 @@ public class CarbonTableInputFormat<T> extends 
FileInputFormat<Void, T> {
   /**
    * return valid segment to access
    */
-  private Segment[] getSegmentsToAccess(JobContext job) {
+  public Segment[] getSegmentsToAccess(JobContext job) {
     String segmentString = job.getConfiguration().get(INPUT_SEGMENT_NUMBERS, 
"");
     if (segmentString.trim().isEmpty()) {
       return new Segment[0];

http://git-wip-us.apache.org/repos/asf/carbondata/blob/65234b27/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
 
b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index 2fbdf4f..e7c4502 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -430,9 +430,7 @@ public class CompactionResultSortProcessor extends 
AbstractResultProcessor {
    */
   private void initTempStoreLocation() {
     tempStoreLocation = CarbonDataProcessorUtil
-        .getLocalDataFolderLocation(carbonLoadModel.getDatabaseName(), 
tableName,
-            carbonLoadModel.getTaskNo(), carbonLoadModel.getPartitionId(), 
segmentId,
-            true, false);
+        .getLocalDataFolderLocation(carbonTable, tableName, 
carbonLoadModel.getTaskNo(),
+            carbonLoadModel.getPartitionId(), segmentId, true, false);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/65234b27/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
 
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index 1e648e1..cfc9fa3 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -117,22 +117,25 @@ public final class CarbonDataProcessorUtil {
       }
     }
   }
+
   /**
+   *
    * This method will form the local data folder store location
    *
-   * @param databaseName
-   * @param tableName
+   * @param carbonTable
    * @param taskId
    * @param partitionId
    * @param segmentId
+   * @param isCompactionFlow
+   * @param isAltPartitionFlow
    * @return
    */
-  public static String[] getLocalDataFolderLocation(String databaseName, 
String tableName,
+  public static String[] getLocalDataFolderLocation(CarbonTable carbonTable, 
String tableName,
       String taskId, String partitionId, String segmentId, boolean 
isCompactionFlow,
       boolean isAltPartitionFlow) {
     String tempLocationKey =
-        getTempStoreLocationKey(databaseName, tableName, segmentId, taskId, 
isCompactionFlow,
-            isAltPartitionFlow);
+        getTempStoreLocationKey(carbonTable.getDatabaseName(), tableName,
+            segmentId, taskId, isCompactionFlow, isAltPartitionFlow);
     String baseTempStorePath = CarbonProperties.getInstance()
         .getProperty(tempLocationKey);
     if (baseTempStorePath == null) {
@@ -145,7 +148,6 @@ public final class CarbonDataProcessorUtil {
     String[] baseTmpStorePathArray = StringUtils.split(baseTempStorePath, 
File.pathSeparator);
     String[] localDataFolderLocArray = new 
String[baseTmpStorePathArray.length];
 
-    CarbonTable carbonTable = 
CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName);
     for (int i = 0 ; i < baseTmpStorePathArray.length; i++) {
       String tmpStore = baseTmpStorePathArray[i];
       CarbonTablePath carbonTablePath =
@@ -159,6 +161,26 @@ public final class CarbonDataProcessorUtil {
   }
 
   /**
+   * This method will form the local data folder store location
+   *
+   * @param databaseName
+   * @param tableName
+   * @param taskId
+   * @param partitionId
+   * @param segmentId
+   * @param isCompactionFlow
+   * @param isAltPartitionFlow
+   * @return
+   */
+  public static String[] getLocalDataFolderLocation(String databaseName, 
String tableName,
+      String taskId, String partitionId, String segmentId, boolean 
isCompactionFlow,
+      boolean isAltPartitionFlow) {
+    CarbonTable carbonTable = 
CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName);
+    return getLocalDataFolderLocation(carbonTable, tableName, taskId, 
partitionId,
+        segmentId, isCompactionFlow, isAltPartitionFlow);
+  }
+
+  /**
    * This method will form the key for getting the temporary location set in 
carbon properties
    *
    * @param databaseName
@@ -587,4 +609,4 @@ public final class CarbonDataProcessorUtil {
     return isRawDataRequired;
   }
 
-}
\ No newline at end of file
+}

Reply via email to