Repository: carbondata
Updated Branches:
  refs/heads/master 315f41c12 -> 44ffaf57e


[CARBONDATA-2071] Added block size to BblockletDataMap while initialising

This closes #1851


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/44ffaf57
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/44ffaf57
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/44ffaf57

Branch: refs/heads/master
Commit: 44ffaf57ef5dfbe6f73241d1a1e31536c0ae90d3
Parents: 315f41c
Author: ravipesala <[email protected]>
Authored: Tue Jan 23 18:31:54 2018 +0530
Committer: QiangCai <[email protected]>
Committed: Wed Jan 24 16:18:26 2018 +0800

----------------------------------------------------------------------
 .../core/indexstore/BlockMetaInfo.java          | 47 ++++++++++++++++++++
 .../indexstore/BlockletDataMapIndexStore.java   | 19 ++++----
 .../core/indexstore/BlockletDetailInfo.java     | 13 ++++++
 .../blockletindex/BlockletDataMap.java          | 33 +++++++++-----
 .../blockletindex/BlockletDataMapModel.java     | 12 ++---
 .../hadoop/CarbonMultiBlockSplit.java           | 29 ++++++++----
 .../streaming/CarbonStreamInputFormatTest.java  |  2 +-
 .../spark/rdd/CarbonIUDMergerRDD.scala          |  2 +-
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  |  2 +-
 .../spark/rdd/CarbonScanPartitionRDD.scala      |  2 +-
 .../carbondata/spark/rdd/CarbonScanRDD.scala    | 14 +++---
 11 files changed, 132 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/44ffaf57/core/src/main/java/org/apache/carbondata/core/indexstore/BlockMetaInfo.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockMetaInfo.java 
b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockMetaInfo.java
new file mode 100644
index 0000000..6a691d5
--- /dev/null
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockMetaInfo.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.indexstore;
+
+/**
+ * Holds the metadata info of the block.
+ */
+public class BlockMetaInfo {
+
+  /**
+   * HDFS locations of a block
+   */
+  private String[] locationInfo;
+
+  /**
+   * Size of block
+   */
+  private long size;
+
+  public BlockMetaInfo(String[] locationInfo, long size) {
+    this.locationInfo = locationInfo;
+    this.size = size;
+  }
+
+  public String[] getLocationInfo() {
+    return locationInfo;
+  }
+
+  public long getSize() {
+    return size;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/44ffaf57/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
index 8eae974..ad80fd7 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java
@@ -78,7 +78,7 @@ public class BlockletDataMapIndexStore
         String segmentPath = CarbonTablePath.getSegmentPath(
             identifier.getAbsoluteTableIdentifier().getTablePath(),
             identifier.getSegmentId());
-        Map<String, String[]> locationMap = new HashMap<>();
+        Map<String, BlockMetaInfo> blockMetaInfoMap = new HashMap<>();
         CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath);
         CarbonFile[] carbonFiles = carbonFile.locationAwareListFiles();
         SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
@@ -86,9 +86,11 @@ public class BlockletDataMapIndexStore
         PartitionMapFileStore partitionFileStore = new PartitionMapFileStore();
         partitionFileStore.readAllPartitionsOfSegment(carbonFiles, 
segmentPath);
         for (CarbonFile file : carbonFiles) {
-          locationMap.put(file.getAbsolutePath(), file.getLocations());
+          blockMetaInfoMap
+              .put(file.getAbsolutePath(), new 
BlockMetaInfo(file.getLocations(), file.getSize()));
         }
-        dataMap = loadAndGetDataMap(identifier, indexFileStore, 
partitionFileStore, locationMap);
+        dataMap =
+            loadAndGetDataMap(identifier, indexFileStore, partitionFileStore, 
blockMetaInfoMap);
       } catch (MemoryException e) {
         LOGGER.error("memory exception when loading datamap: " + 
e.getMessage());
         throw new RuntimeException(e.getMessage(), e);
@@ -116,7 +118,7 @@ public class BlockletDataMapIndexStore
       if (missedIdentifiers.size() > 0) {
         Map<String, SegmentIndexFileStore> segmentIndexFileStoreMap = new 
HashMap<>();
         Map<String, PartitionMapFileStore> partitionFileStoreMap = new 
HashMap<>();
-        Map<String, String[]> locationMap = new HashMap<>();
+        Map<String, BlockMetaInfo> blockMetaInfoMap = new HashMap<>();
 
         for (TableBlockIndexUniqueIdentifier identifier: missedIdentifiers) {
           SegmentIndexFileStore indexFileStore =
@@ -136,11 +138,12 @@ public class BlockletDataMapIndexStore
             partitionFileStore.readAllPartitionsOfSegment(carbonFiles, 
segmentPath);
             partitionFileStoreMap.put(identifier.getSegmentId(), 
partitionFileStore);
             for (CarbonFile file : carbonFiles) {
-              locationMap.put(file.getAbsolutePath(), file.getLocations());
+              blockMetaInfoMap.put(file.getAbsolutePath(),
+                  new BlockMetaInfo(file.getLocations(), file.getSize()));
             }
           }
           blockletDataMaps.add(
-              loadAndGetDataMap(identifier, indexFileStore, 
partitionFileStore, locationMap));
+              loadAndGetDataMap(identifier, indexFileStore, 
partitionFileStore, blockMetaInfoMap));
         }
       }
     } catch (Throwable e) {
@@ -192,7 +195,7 @@ public class BlockletDataMapIndexStore
       TableBlockIndexUniqueIdentifier identifier,
       SegmentIndexFileStore indexFileStore,
       PartitionMapFileStore partitionFileStore,
-      Map<String, String[]> locationMap)
+      Map<String, BlockMetaInfo> blockMetaInfoMap)
       throws IOException, MemoryException {
     String uniqueTableSegmentIdentifier =
         identifier.getUniqueTableSegmentIdentifier();
@@ -206,7 +209,7 @@ public class BlockletDataMapIndexStore
       dataMap.init(new BlockletDataMapModel(identifier.getFilePath(),
           indexFileStore.getFileData(identifier.getCarbonIndexFileName()),
           
partitionFileStore.getPartitions(identifier.getCarbonIndexFileName()),
-          partitionFileStore.isPartionedSegment(), locationMap));
+          partitionFileStore.isPartionedSegment(), blockMetaInfoMap));
       lruCache.put(identifier.getUniqueTableSegmentIdentifier(), dataMap,
           dataMap.getMemorySize());
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/44ffaf57/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
index 5f4224c..ce05fe2 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java
@@ -56,6 +56,8 @@ public class BlockletDetailInfo implements Serializable, 
Writable {
 
   private byte[] columnSchemaBinary;
 
+  private long blockSize;
+
   public int getRowCount() {
     return rowCount;
   }
@@ -104,6 +106,14 @@ public class BlockletDetailInfo implements Serializable, 
Writable {
     this.schemaUpdatedTimeStamp = schemaUpdatedTimeStamp;
   }
 
+  public long getBlockSize() {
+    return blockSize;
+  }
+
+  public void setBlockSize(long blockSize) {
+    this.blockSize = blockSize;
+  }
+
   @Override public void write(DataOutput out) throws IOException {
     out.writeInt(rowCount);
     out.writeShort(pagesCount);
@@ -121,6 +131,7 @@ public class BlockletDetailInfo implements Serializable, 
Writable {
     out.writeLong(blockFooterOffset);
     out.writeInt(columnSchemaBinary.length);
     out.write(columnSchemaBinary);
+    out.writeLong(blockSize);
   }
 
   @Override public void readFields(DataInput in) throws IOException {
@@ -142,6 +153,7 @@ public class BlockletDetailInfo implements Serializable, 
Writable {
     byte[] schemaArray = new byte[bytesSize];
     in.readFully(schemaArray);
     readColumnSchema(schemaArray);
+    blockSize = in.readLong();
   }
 
   /**
@@ -177,6 +189,7 @@ public class BlockletDetailInfo implements Serializable, 
Writable {
     detailInfo.blockletInfo = blockletInfo;
     detailInfo.blockFooterOffset = blockFooterOffset;
     detailInfo.columnSchemas = columnSchemas;
+    detailInfo.blockSize = blockSize;
     return detailInfo;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/44ffaf57/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
index 7b2c016..b097c66 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -40,6 +40,7 @@ import org.apache.carbondata.core.datamap.dev.DataMapModel;
 import org.apache.carbondata.core.datastore.IndexKey;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.indexstore.BlockMetaInfo;
 import org.apache.carbondata.core.indexstore.Blocklet;
 import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
@@ -103,6 +104,8 @@ public class BlockletDataMap implements DataMap, Cacheable {
 
   private static int BLOCKLET_ID_INDEX = 11;
 
+  private static int BLOCK_LENGTH = 12;
+
   private static int TASK_MIN_VALUES_INDEX = 0;
 
   private static int TASK_MAX_VALUES_INDEX = 1;
@@ -146,18 +149,19 @@ public class BlockletDataMap implements DataMap, 
Cacheable {
         createSummarySchema(segmentProperties, 
blockletDataMapInfo.getPartitions(), schemaBinary);
       }
       TableBlockInfo blockInfo = fileFooter.getBlockInfo().getTableBlockInfo();
-      String[] locations = 
blockletDataMapInfo.getLocationMap().get(blockInfo.getFilePath());
+      BlockMetaInfo blockMetaInfo =
+          
blockletDataMapInfo.getBlockMetaInfoMap().get(blockInfo.getFilePath());
       // Here it loads info about all blocklets of index
       // Only add if the file exists physically. There are scenarios which 
index file exists inside
       // merge index but related carbondata files are deleted. In that case we 
first check whether
       // the file exists physically or not
-      if (locations != null) {
+      if (blockMetaInfo != null) {
         if (fileFooter.getBlockletList() == null) {
           // This is old store scenario, here blocklet information is not 
available in index file so
           // load only block info
           summaryRow =
               loadToUnsafeBlock(fileFooter, segmentProperties, 
blockInfo.getFilePath(), summaryRow,
-                  locations);
+                  blockMetaInfo);
         } else {
           // blocklet ID will start from 0 again only when part file path is 
changed
           if (null == tempFilePath || 
!tempFilePath.equals(blockInfo.getFilePath())) {
@@ -166,7 +170,7 @@ public class BlockletDataMap implements DataMap, Cacheable {
           }
           summaryRow =
               loadToUnsafe(fileFooter, segmentProperties, 
blockInfo.getFilePath(), summaryRow,
-                  locations, relativeBlockletId);
+                  blockMetaInfo, relativeBlockletId);
           // this is done because relative blocklet id need to be incremented 
based on the
           // total number of blocklets
           relativeBlockletId += fileFooter.getBlockletList().size();
@@ -190,7 +194,7 @@ public class BlockletDataMap implements DataMap, Cacheable {
 
   private DataMapRowImpl loadToUnsafe(DataFileFooter fileFooter,
       SegmentProperties segmentProperties, String filePath, DataMapRowImpl 
summaryRow,
-      String[] locations, int relativeBlockletId) {
+      BlockMetaInfo blockMetaInfo, int relativeBlockletId) {
     int[] minMaxLen = segmentProperties.getColumnsValueSize();
     List<BlockletInfo> blockletList = fileFooter.getBlockletList();
     CarbonRowSchema[] schema = unsafeMemoryDMStore.getSchema();
@@ -249,10 +253,12 @@ public class BlockletDataMap implements DataMap, 
Cacheable {
         row.setByteArray(serializedData, ordinal++);
         // Add block footer offset, it is used if we need to read footer of 
block
         
row.setLong(fileFooter.getBlockInfo().getTableBlockInfo().getBlockOffset(), 
ordinal++);
-        setLocations(locations, row, ordinal);
+        setLocations(blockMetaInfo.getLocationInfo(), row, ordinal);
         ordinal++;
         // for relative blockelt id i.e blocklet id that belongs to a 
particular part file
-        row.setShort((short) relativeBlockletId++, ordinal);
+        row.setShort((short) relativeBlockletId++, ordinal++);
+        // Store block size
+        row.setLong(blockMetaInfo.getSize(), ordinal);
         unsafeMemoryDMStore.addIndexRowToUnsafe(row);
       } catch (Exception e) {
         throw new RuntimeException(e);
@@ -276,7 +282,7 @@ public class BlockletDataMap implements DataMap, Cacheable {
    */
   private DataMapRowImpl loadToUnsafeBlock(DataFileFooter fileFooter,
       SegmentProperties segmentProperties, String filePath, DataMapRowImpl 
summaryRow,
-      String[] locations) {
+      BlockMetaInfo blockMetaInfo) {
     int[] minMaxLen = segmentProperties.getColumnsValueSize();
     BlockletIndex blockletIndex = fileFooter.getBlockletIndex();
     CarbonRowSchema[] schema = unsafeMemoryDMStore.getSchema();
@@ -327,12 +333,15 @@ public class BlockletDataMap implements DataMap, 
Cacheable {
 
     
row.setLong(fileFooter.getBlockInfo().getTableBlockInfo().getBlockOffset(), 
ordinal++);
     try {
-      setLocations(locations, row, ordinal);
+      setLocations(blockMetaInfo.getLocationInfo(), row, ordinal);
       ordinal++;
       // for relative blocklet id. Value is -1 because in case of old store 
blocklet info will
       // not be present in the index file and in that case we will not knwo 
the total number of
       // blocklets
-      row.setShort((short) -1, ordinal);
+      row.setShort((short) -1, ordinal++);
+
+      // store block size
+      row.setLong(blockMetaInfo.getSize(), ordinal);
       unsafeMemoryDMStore.addIndexRowToUnsafe(row);
     } catch (Exception e) {
       throw new RuntimeException(e);
@@ -535,6 +544,9 @@ public class BlockletDataMap implements DataMap, Cacheable {
     // for relative blocklet id i.e. blocklet id that belongs to a particular 
part file.
     indexSchemas.add(new 
CarbonRowSchema.FixedCarbonRowSchema(DataTypes.SHORT));
 
+    // for storing block length.
+    indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.LONG));
+
     unsafeMemoryDMStore =
         new UnsafeMemoryDMStore(indexSchemas.toArray(new 
CarbonRowSchema[indexSchemas.size()]));
   }
@@ -780,6 +792,7 @@ public class BlockletDataMap implements DataMap, Cacheable {
     blocklet.setDetailInfo(detailInfo);
     detailInfo.setBlockFooterOffset(row.getLong(BLOCK_FOOTER_OFFSET));
     detailInfo.setColumnSchemaBinary(getColumnSchemaBinary());
+    detailInfo.setBlockSize(row.getLong(BLOCK_LENGTH));
     return blocklet;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/44ffaf57/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
index 85293a1..b3a7f8c 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java
@@ -20,6 +20,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.carbondata.core.datamap.dev.DataMapModel;
+import org.apache.carbondata.core.indexstore.BlockMetaInfo;
 
 /**
  * It is the model object to keep the information to build or initialize 
BlockletDataMap.
@@ -32,15 +33,16 @@ public class BlockletDataMapModel extends DataMapModel {
 
   private boolean partitionedSegment;
 
-  private Map<String, String[]> locationMap;
+  Map<String, BlockMetaInfo> blockMetaInfoMap;
 
   public BlockletDataMapModel(String filePath, byte[] fileData, List<String> 
partitions,
-      boolean partitionedSegment, Map<String, String[]> locationMap) {
+      boolean partitionedSegment,
+      Map<String, BlockMetaInfo> blockMetaInfoMap) {
     super(filePath);
     this.fileData = fileData;
     this.partitions = partitions;
     this.partitionedSegment = partitionedSegment;
-    this.locationMap = locationMap;
+    this.blockMetaInfoMap = blockMetaInfoMap;
   }
 
   public byte[] getFileData() {
@@ -55,7 +57,7 @@ public class BlockletDataMapModel extends DataMapModel {
     return partitionedSegment;
   }
 
-  public Map<String, String[]> getLocationMap() {
-    return locationMap;
+  public Map<String, BlockMetaInfo> getBlockMetaInfoMap() {
+    return blockMetaInfoMap;
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/44ffaf57/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
index aed3449..7c06b60 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java
@@ -21,9 +21,10 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.statusmanager.FileFormat;
 
 import org.apache.hadoop.io.Writable;
@@ -55,15 +56,15 @@ public class CarbonMultiBlockSplit extends InputSplit 
implements Writable {
     length = 0;
   }
 
-  public CarbonMultiBlockSplit(AbsoluteTableIdentifier identifier, 
List<CarbonInputSplit> splitList,
-      String[] locations) throws IOException {
+  public CarbonMultiBlockSplit(List<CarbonInputSplit> splitList,
+      String[] locations) {
     this.splitList = splitList;
     this.locations = locations;
     calculateLength();
   }
 
-  public CarbonMultiBlockSplit(AbsoluteTableIdentifier identifier, 
List<CarbonInputSplit> splitList,
-      String[] locations, FileFormat fileFormat) throws IOException {
+  public CarbonMultiBlockSplit(List<CarbonInputSplit> splitList,
+      String[] locations, FileFormat fileFormat) {
     this.splitList = splitList;
     this.locations = locations;
     this.fileFormat = fileFormat;
@@ -79,7 +80,7 @@ public class CarbonMultiBlockSplit extends InputSplit 
implements Writable {
   }
 
   @Override
-  public long getLength() throws IOException, InterruptedException {
+  public long getLength() {
     return length;
   }
 
@@ -89,14 +90,24 @@ public class CarbonMultiBlockSplit extends InputSplit 
implements Writable {
 
   private void calculateLength() {
     long total = 0;
-    for (CarbonInputSplit split : splitList) {
-      total += split.getLength();
+    if (splitList.size() > 0 && splitList.get(0).getDetailInfo() != null) {
+      Map<String, Long> blockSizes = new HashMap<>();
+      for (CarbonInputSplit split : splitList) {
+        blockSizes.put(split.getBlockPath(), 
split.getDetailInfo().getBlockSize());
+      }
+      for (Map.Entry<String, Long> entry : blockSizes.entrySet()) {
+        total += entry.getValue();
+      }
+    } else {
+      for (CarbonInputSplit split : splitList) {
+        total += split.getLength();
+      }
     }
     length = total;
   }
 
   @Override
-  public String[] getLocations() throws IOException, InterruptedException {
+  public String[] getLocations() {
     return locations;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/44ffaf57/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormatTest.java
 
b/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormatTest.java
index d7f9ac2..57f488f 100644
--- 
a/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormatTest.java
+++ 
b/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormatTest.java
@@ -74,7 +74,7 @@ public class CarbonStreamInputFormatTest extends TestCase {
     CarbonInputSplit carbonInputSplit = new CarbonInputSplit();
     List<CarbonInputSplit> splitList = new ArrayList<>();
     splitList.add(carbonInputSplit);
-    return new CarbonMultiBlockSplit(identifier, splitList, new String[] { 
"localhost" },
+    return new CarbonMultiBlockSplit(splitList, new String[] { "localhost" },
         FileFormat.ROW_V1);
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/44ffaf57/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
index 26fa037..e8180cd 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
@@ -94,7 +94,7 @@ class CarbonIUDMergerRDD[K, V](
         val locations = validSplits.head.getLocations
         i += 1
         new CarbonSparkPartition(id, i,
-          new CarbonMultiBlockSplit(absoluteTableIdentifier, 
validSplits.asJava, locations))
+          new CarbonMultiBlockSplit(validSplits.asJava, locations))
       } else {
         null
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/44ffaf57/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 48907cb..8d7b044 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -433,7 +433,7 @@ class CarbonMergerRDD[K, V](
 
         if (blockletCount != 0) {
           val taskInfo = splitInfo.asInstanceOf[CarbonInputSplitTaskInfo]
-          val multiBlockSplit = new 
CarbonMultiBlockSplit(absoluteTableIdentifier,
+          val multiBlockSplit = new CarbonMultiBlockSplit(
             taskInfo.getCarbonInputSplitList,
             Array(nodeName))
           if (isPartitionTable) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/44ffaf57/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala
index 1a8943b..5647427 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala
@@ -115,7 +115,7 @@ class CarbonScanPartitionRDD(alterPartitionModel: 
AlterPartitionModel,
           val splits = 
blocksPerTask.asScala.map(_.asInstanceOf[CarbonInputSplit])
           if (blocksPerTask.size() != 0) {
             val multiBlockSplit =
-              new CarbonMultiBlockSplit(absoluteTableIdentifier, 
splits.asJava, Array(node))
+              new CarbonMultiBlockSplit(splits.asJava, Array(node))
             val partition = new CarbonSparkPartition(id, partition_num, 
multiBlockSplit)
             result.add(partition)
             partition_num += 1

http://git-wip-us.apache.org/repos/asf/carbondata/blob/44ffaf57/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index a04e9e1..f2c3060 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -111,7 +111,7 @@ class CarbonScanRDD(
       val streamPartitions: mutable.Buffer[Partition] =
         streamSplits.zipWithIndex.map { splitWithIndex =>
           val multiBlockSplit =
-            new CarbonMultiBlockSplit(identifier,
+            new CarbonMultiBlockSplit(
               Seq(splitWithIndex._1.asInstanceOf[CarbonInputSplit]).asJava,
               splitWithIndex._1.getLocations,
               FileFormat.ROW_V1)
@@ -156,7 +156,7 @@ class CarbonScanRDD(
         (0 until bucketedTable.getNumberOfBuckets).map { bucketId =>
           val bucketPartitions = bucketed.getOrElse(bucketId.toString, Nil)
           val multiBlockSplit =
-            new CarbonMultiBlockSplit(identifier,
+            new CarbonMultiBlockSplit(
               bucketPartitions.asJava,
               bucketPartitions.flatMap(_.getLocations).toArray)
           val partition = new CarbonSparkPartition(id, i, multiBlockSplit)
@@ -186,7 +186,7 @@ class CarbonScanRDD(
               val splits = 
blocksPerTask.asScala.map(_.asInstanceOf[CarbonInputSplit])
               if (blocksPerTask.size() != 0) {
                 val multiBlockSplit =
-                  new CarbonMultiBlockSplit(identifier, splits.asJava, 
Array(node))
+                  new CarbonMultiBlockSplit(splits.asJava, Array(node))
                 val partition = new CarbonSparkPartition(id, i, 
multiBlockSplit)
                 result.add(partition)
                 i += 1
@@ -200,7 +200,7 @@ class CarbonScanRDD(
           // Randomize the blocklets for better shuffling
           Random.shuffle(splits.asScala).zipWithIndex.foreach { splitWithIndex 
=>
             val multiBlockSplit =
-              new CarbonMultiBlockSplit(identifier,
+              new CarbonMultiBlockSplit(
                 Seq(splitWithIndex._1.asInstanceOf[CarbonInputSplit]).asJava,
                 splitWithIndex._1.getLocations)
             val partition = new CarbonSparkPartition(id, splitWithIndex._2, 
multiBlockSplit)
@@ -215,7 +215,7 @@ class CarbonScanRDD(
             .map(_.asInstanceOf[CarbonInputSplit])
             .groupBy(f => f.getBlockPath)
             .map { blockSplitEntry =>
-              new CarbonMultiBlockSplit(identifier,
+              new CarbonMultiBlockSplit(
                 blockSplitEntry._2.asJava,
                 blockSplitEntry._2.flatMap(f => 
f.getLocations).distinct.toArray)
             }.toArray.sortBy(_.getLength)(implicitly[Ordering[Long]].reverse)
@@ -257,7 +257,7 @@ class CarbonScanRDD(
             f.getSegmentId.concat(f.getBlockPath)
           }.values.zipWithIndex.foreach { splitWithIndex =>
             val multiBlockSplit =
-              new CarbonMultiBlockSplit(identifier,
+              new CarbonMultiBlockSplit(
                 splitWithIndex._1.asJava,
                 splitWithIndex._1.flatMap(f => 
f.getLocations).distinct.toArray)
             val partition = new CarbonSparkPartition(id, splitWithIndex._2, 
multiBlockSplit)
@@ -306,7 +306,7 @@ class CarbonScanRDD(
       .map(_._1)
       .toArray
 
-    val multiBlockSplit = new CarbonMultiBlockSplit(null, 
carbonInputSplits.asJava, locations)
+    val multiBlockSplit = new CarbonMultiBlockSplit(carbonInputSplits.asJava, 
locations)
     new CarbonSparkPartition(id, partitionId, multiBlockSplit)
   }
 

Reply via email to