Repository: carbondata Updated Branches: refs/heads/master 315f41c12 -> 44ffaf57e
[CARBONDATA-2071] Added block size to BblockletDataMap while initialising This closes #1851 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/44ffaf57 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/44ffaf57 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/44ffaf57 Branch: refs/heads/master Commit: 44ffaf57ef5dfbe6f73241d1a1e31536c0ae90d3 Parents: 315f41c Author: ravipesala <[email protected]> Authored: Tue Jan 23 18:31:54 2018 +0530 Committer: QiangCai <[email protected]> Committed: Wed Jan 24 16:18:26 2018 +0800 ---------------------------------------------------------------------- .../core/indexstore/BlockMetaInfo.java | 47 ++++++++++++++++++++ .../indexstore/BlockletDataMapIndexStore.java | 19 ++++---- .../core/indexstore/BlockletDetailInfo.java | 13 ++++++ .../blockletindex/BlockletDataMap.java | 33 +++++++++----- .../blockletindex/BlockletDataMapModel.java | 12 ++--- .../hadoop/CarbonMultiBlockSplit.java | 29 ++++++++---- .../streaming/CarbonStreamInputFormatTest.java | 2 +- .../spark/rdd/CarbonIUDMergerRDD.scala | 2 +- .../carbondata/spark/rdd/CarbonMergerRDD.scala | 2 +- .../spark/rdd/CarbonScanPartitionRDD.scala | 2 +- .../carbondata/spark/rdd/CarbonScanRDD.scala | 14 +++--- 11 files changed, 132 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/44ffaf57/core/src/main/java/org/apache/carbondata/core/indexstore/BlockMetaInfo.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockMetaInfo.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockMetaInfo.java new file mode 100644 index 0000000..6a691d5 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockMetaInfo.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.indexstore; + +/** + * Holds the metadata info of the block. + */ +public class BlockMetaInfo { + + /** + * HDFS locations of a block + */ + private String[] locationInfo; + + /** + * Size of block + */ + private long size; + + public BlockMetaInfo(String[] locationInfo, long size) { + this.locationInfo = locationInfo; + this.size = size; + } + + public String[] getLocationInfo() { + return locationInfo; + } + + public long getSize() { + return size; + } +} + http://git-wip-us.apache.org/repos/asf/carbondata/blob/44ffaf57/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java index 8eae974..ad80fd7 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java @@ -78,7 +78,7 @@ public class BlockletDataMapIndexStore String segmentPath = CarbonTablePath.getSegmentPath( identifier.getAbsoluteTableIdentifier().getTablePath(), identifier.getSegmentId()); - Map<String, String[]> locationMap = new HashMap<>(); + Map<String, BlockMetaInfo> blockMetaInfoMap = new HashMap<>(); CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath); CarbonFile[] carbonFiles = carbonFile.locationAwareListFiles(); SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore(); @@ -86,9 +86,11 @@ public class BlockletDataMapIndexStore PartitionMapFileStore partitionFileStore = new PartitionMapFileStore(); partitionFileStore.readAllPartitionsOfSegment(carbonFiles, segmentPath); for (CarbonFile file : carbonFiles) { - locationMap.put(file.getAbsolutePath(), file.getLocations()); + blockMetaInfoMap + .put(file.getAbsolutePath(), new BlockMetaInfo(file.getLocations(), file.getSize())); } - dataMap = loadAndGetDataMap(identifier, indexFileStore, partitionFileStore, locationMap); + dataMap = + loadAndGetDataMap(identifier, indexFileStore, partitionFileStore, blockMetaInfoMap); } catch (MemoryException e) { LOGGER.error("memory exception when loading datamap: " + e.getMessage()); throw new RuntimeException(e.getMessage(), e); @@ -116,7 +118,7 @@ public class BlockletDataMapIndexStore if (missedIdentifiers.size() > 0) { Map<String, SegmentIndexFileStore> segmentIndexFileStoreMap = new HashMap<>(); Map<String, PartitionMapFileStore> partitionFileStoreMap = new HashMap<>(); - Map<String, String[]> locationMap = new HashMap<>(); + Map<String, BlockMetaInfo> blockMetaInfoMap = new HashMap<>(); for (TableBlockIndexUniqueIdentifier identifier: missedIdentifiers) { SegmentIndexFileStore indexFileStore = @@ -136,11 +138,12 @@ public class BlockletDataMapIndexStore partitionFileStore.readAllPartitionsOfSegment(carbonFiles, segmentPath); partitionFileStoreMap.put(identifier.getSegmentId(), partitionFileStore); for (CarbonFile file : carbonFiles) { - locationMap.put(file.getAbsolutePath(), file.getLocations()); + blockMetaInfoMap.put(file.getAbsolutePath(), + new BlockMetaInfo(file.getLocations(), file.getSize())); } } blockletDataMaps.add( - loadAndGetDataMap(identifier, indexFileStore, partitionFileStore, locationMap)); + loadAndGetDataMap(identifier, indexFileStore, partitionFileStore, blockMetaInfoMap)); } } } catch (Throwable e) { @@ -192,7 +195,7 @@ public class BlockletDataMapIndexStore TableBlockIndexUniqueIdentifier identifier, SegmentIndexFileStore indexFileStore, PartitionMapFileStore partitionFileStore, - Map<String, String[]> locationMap) + Map<String, BlockMetaInfo> blockMetaInfoMap) throws IOException, MemoryException { String uniqueTableSegmentIdentifier = identifier.getUniqueTableSegmentIdentifier(); @@ -206,7 +209,7 @@ public class BlockletDataMapIndexStore dataMap.init(new BlockletDataMapModel(identifier.getFilePath(), indexFileStore.getFileData(identifier.getCarbonIndexFileName()), partitionFileStore.getPartitions(identifier.getCarbonIndexFileName()), - partitionFileStore.isPartionedSegment(), locationMap)); + partitionFileStore.isPartionedSegment(), blockMetaInfoMap)); lruCache.put(identifier.getUniqueTableSegmentIdentifier(), dataMap, dataMap.getMemorySize()); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/44ffaf57/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java index 5f4224c..ce05fe2 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailInfo.java @@ -56,6 +56,8 @@ public class BlockletDetailInfo implements Serializable, Writable { private byte[] columnSchemaBinary; + private long blockSize; + public int getRowCount() { return rowCount; } @@ -104,6 +106,14 @@ public class BlockletDetailInfo implements Serializable, Writable { this.schemaUpdatedTimeStamp = schemaUpdatedTimeStamp; } + public long getBlockSize() { + return blockSize; + } + + public void setBlockSize(long blockSize) { + this.blockSize = blockSize; + } + @Override public void write(DataOutput out) throws IOException { out.writeInt(rowCount); out.writeShort(pagesCount); @@ -121,6 +131,7 @@ public class BlockletDetailInfo implements Serializable, Writable { out.writeLong(blockFooterOffset); out.writeInt(columnSchemaBinary.length); out.write(columnSchemaBinary); + out.writeLong(blockSize); } @Override public void readFields(DataInput in) throws IOException { @@ -142,6 +153,7 @@ public class BlockletDetailInfo implements Serializable, Writable { byte[] schemaArray = new byte[bytesSize]; in.readFully(schemaArray); readColumnSchema(schemaArray); + blockSize = in.readLong(); } /** @@ -177,6 +189,7 @@ public class BlockletDetailInfo implements Serializable, Writable { detailInfo.blockletInfo = blockletInfo; detailInfo.blockFooterOffset = blockFooterOffset; detailInfo.columnSchemas = columnSchemas; + detailInfo.blockSize = blockSize; return detailInfo; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/44ffaf57/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java index 7b2c016..b097c66 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java @@ -40,6 +40,7 @@ import org.apache.carbondata.core.datamap.dev.DataMapModel; import org.apache.carbondata.core.datastore.IndexKey; import org.apache.carbondata.core.datastore.block.SegmentProperties; import org.apache.carbondata.core.datastore.block.TableBlockInfo; +import org.apache.carbondata.core.indexstore.BlockMetaInfo; import org.apache.carbondata.core.indexstore.Blocklet; import org.apache.carbondata.core.indexstore.BlockletDetailInfo; import org.apache.carbondata.core.indexstore.ExtendedBlocklet; @@ -103,6 +104,8 @@ public class BlockletDataMap implements DataMap, Cacheable { private static int BLOCKLET_ID_INDEX = 11; + private static int BLOCK_LENGTH = 12; + private static int TASK_MIN_VALUES_INDEX = 0; private static int TASK_MAX_VALUES_INDEX = 1; @@ -146,18 +149,19 @@ public class BlockletDataMap implements DataMap, Cacheable { createSummarySchema(segmentProperties, blockletDataMapInfo.getPartitions(), schemaBinary); } TableBlockInfo blockInfo = fileFooter.getBlockInfo().getTableBlockInfo(); - String[] locations = blockletDataMapInfo.getLocationMap().get(blockInfo.getFilePath()); + BlockMetaInfo blockMetaInfo = + blockletDataMapInfo.getBlockMetaInfoMap().get(blockInfo.getFilePath()); // Here it loads info about all blocklets of index // Only add if the file exists physically. There are scenarios which index file exists inside // merge index but related carbondata files are deleted. In that case we first check whether // the file exists physically or not - if (locations != null) { + if (blockMetaInfo != null) { if (fileFooter.getBlockletList() == null) { // This is old store scenario, here blocklet information is not available in index file so // load only block info summaryRow = loadToUnsafeBlock(fileFooter, segmentProperties, blockInfo.getFilePath(), summaryRow, - locations); + blockMetaInfo); } else { // blocklet ID will start from 0 again only when part file path is changed if (null == tempFilePath || !tempFilePath.equals(blockInfo.getFilePath())) { @@ -166,7 +170,7 @@ public class BlockletDataMap implements DataMap, Cacheable { } summaryRow = loadToUnsafe(fileFooter, segmentProperties, blockInfo.getFilePath(), summaryRow, - locations, relativeBlockletId); + blockMetaInfo, relativeBlockletId); // this is done because relative blocklet id need to be incremented based on the // total number of blocklets relativeBlockletId += fileFooter.getBlockletList().size(); @@ -190,7 +194,7 @@ public class BlockletDataMap implements DataMap, Cacheable { private DataMapRowImpl loadToUnsafe(DataFileFooter fileFooter, SegmentProperties segmentProperties, String filePath, DataMapRowImpl summaryRow, - String[] locations, int relativeBlockletId) { + BlockMetaInfo blockMetaInfo, int relativeBlockletId) { int[] minMaxLen = segmentProperties.getColumnsValueSize(); List<BlockletInfo> blockletList = fileFooter.getBlockletList(); CarbonRowSchema[] schema = unsafeMemoryDMStore.getSchema(); @@ -249,10 +253,12 @@ public class BlockletDataMap implements DataMap, Cacheable { row.setByteArray(serializedData, ordinal++); // Add block footer offset, it is used if we need to read footer of block row.setLong(fileFooter.getBlockInfo().getTableBlockInfo().getBlockOffset(), ordinal++); - setLocations(locations, row, ordinal); + setLocations(blockMetaInfo.getLocationInfo(), row, ordinal); ordinal++; // for relative blockelt id i.e blocklet id that belongs to a particular part file - row.setShort((short) relativeBlockletId++, ordinal); + row.setShort((short) relativeBlockletId++, ordinal++); + // Store block size + row.setLong(blockMetaInfo.getSize(), ordinal); unsafeMemoryDMStore.addIndexRowToUnsafe(row); } catch (Exception e) { throw new RuntimeException(e); @@ -276,7 +282,7 @@ public class BlockletDataMap implements DataMap, Cacheable { */ private DataMapRowImpl loadToUnsafeBlock(DataFileFooter fileFooter, SegmentProperties segmentProperties, String filePath, DataMapRowImpl summaryRow, - String[] locations) { + BlockMetaInfo blockMetaInfo) { int[] minMaxLen = segmentProperties.getColumnsValueSize(); BlockletIndex blockletIndex = fileFooter.getBlockletIndex(); CarbonRowSchema[] schema = unsafeMemoryDMStore.getSchema(); @@ -327,12 +333,15 @@ public class BlockletDataMap implements DataMap, Cacheable { row.setLong(fileFooter.getBlockInfo().getTableBlockInfo().getBlockOffset(), ordinal++); try { - setLocations(locations, row, ordinal); + setLocations(blockMetaInfo.getLocationInfo(), row, ordinal); ordinal++; // for relative blocklet id. Value is -1 because in case of old store blocklet info will // not be present in the index file and in that case we will not knwo the total number of // blocklets - row.setShort((short) -1, ordinal); + row.setShort((short) -1, ordinal++); + + // store block size + row.setLong(blockMetaInfo.getSize(), ordinal); unsafeMemoryDMStore.addIndexRowToUnsafe(row); } catch (Exception e) { throw new RuntimeException(e); @@ -535,6 +544,9 @@ public class BlockletDataMap implements DataMap, Cacheable { // for relative blocklet id i.e. blocklet id that belongs to a particular part file. indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.SHORT)); + // for storing block length. + indexSchemas.add(new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.LONG)); + unsafeMemoryDMStore = new UnsafeMemoryDMStore(indexSchemas.toArray(new CarbonRowSchema[indexSchemas.size()])); } @@ -780,6 +792,7 @@ public class BlockletDataMap implements DataMap, Cacheable { blocklet.setDetailInfo(detailInfo); detailInfo.setBlockFooterOffset(row.getLong(BLOCK_FOOTER_OFFSET)); detailInfo.setColumnSchemaBinary(getColumnSchemaBinary()); + detailInfo.setBlockSize(row.getLong(BLOCK_LENGTH)); return blocklet; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/44ffaf57/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java index 85293a1..b3a7f8c 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapModel.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.Map; import org.apache.carbondata.core.datamap.dev.DataMapModel; +import org.apache.carbondata.core.indexstore.BlockMetaInfo; /** * It is the model object to keep the information to build or initialize BlockletDataMap. @@ -32,15 +33,16 @@ public class BlockletDataMapModel extends DataMapModel { private boolean partitionedSegment; - private Map<String, String[]> locationMap; + Map<String, BlockMetaInfo> blockMetaInfoMap; public BlockletDataMapModel(String filePath, byte[] fileData, List<String> partitions, - boolean partitionedSegment, Map<String, String[]> locationMap) { + boolean partitionedSegment, + Map<String, BlockMetaInfo> blockMetaInfoMap) { super(filePath); this.fileData = fileData; this.partitions = partitions; this.partitionedSegment = partitionedSegment; - this.locationMap = locationMap; + this.blockMetaInfoMap = blockMetaInfoMap; } public byte[] getFileData() { @@ -55,7 +57,7 @@ public class BlockletDataMapModel extends DataMapModel { return partitionedSegment; } - public Map<String, String[]> getLocationMap() { - return locationMap; + public Map<String, BlockMetaInfo> getBlockMetaInfoMap() { + return blockMetaInfoMap; } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/44ffaf57/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java index aed3449..7c06b60 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java @@ -21,9 +21,10 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; -import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.statusmanager.FileFormat; import org.apache.hadoop.io.Writable; @@ -55,15 +56,15 @@ public class CarbonMultiBlockSplit extends InputSplit implements Writable { length = 0; } - public CarbonMultiBlockSplit(AbsoluteTableIdentifier identifier, List<CarbonInputSplit> splitList, - String[] locations) throws IOException { + public CarbonMultiBlockSplit(List<CarbonInputSplit> splitList, + String[] locations) { this.splitList = splitList; this.locations = locations; calculateLength(); } - public CarbonMultiBlockSplit(AbsoluteTableIdentifier identifier, List<CarbonInputSplit> splitList, - String[] locations, FileFormat fileFormat) throws IOException { + public CarbonMultiBlockSplit(List<CarbonInputSplit> splitList, + String[] locations, FileFormat fileFormat) { this.splitList = splitList; this.locations = locations; this.fileFormat = fileFormat; @@ -79,7 +80,7 @@ public class CarbonMultiBlockSplit extends InputSplit implements Writable { } @Override - public long getLength() throws IOException, InterruptedException { + public long getLength() { return length; } @@ -89,14 +90,24 @@ public class CarbonMultiBlockSplit extends InputSplit implements Writable { private void calculateLength() { long total = 0; - for (CarbonInputSplit split : splitList) { - total += split.getLength(); + if (splitList.size() > 0 && splitList.get(0).getDetailInfo() != null) { + Map<String, Long> blockSizes = new HashMap<>(); + for (CarbonInputSplit split : splitList) { + blockSizes.put(split.getBlockPath(), split.getDetailInfo().getBlockSize()); + } + for (Map.Entry<String, Long> entry : blockSizes.entrySet()) { + total += entry.getValue(); + } + } else { + for (CarbonInputSplit split : splitList) { + total += split.getLength(); + } } length = total; } @Override - public String[] getLocations() throws IOException, InterruptedException { + public String[] getLocations() { return locations; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/44ffaf57/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormatTest.java ---------------------------------------------------------------------- diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormatTest.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormatTest.java index d7f9ac2..57f488f 100644 --- a/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormatTest.java +++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormatTest.java @@ -74,7 +74,7 @@ public class CarbonStreamInputFormatTest extends TestCase { CarbonInputSplit carbonInputSplit = new CarbonInputSplit(); List<CarbonInputSplit> splitList = new ArrayList<>(); splitList.add(carbonInputSplit); - return new CarbonMultiBlockSplit(identifier, splitList, new String[] { "localhost" }, + return new CarbonMultiBlockSplit(splitList, new String[] { "localhost" }, FileFormat.ROW_V1); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/44ffaf57/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala index 26fa037..e8180cd 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala @@ -94,7 +94,7 @@ class CarbonIUDMergerRDD[K, V]( val locations = validSplits.head.getLocations i += 1 new CarbonSparkPartition(id, i, - new CarbonMultiBlockSplit(absoluteTableIdentifier, validSplits.asJava, locations)) + new CarbonMultiBlockSplit(validSplits.asJava, locations)) } else { null } http://git-wip-us.apache.org/repos/asf/carbondata/blob/44ffaf57/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala index 48907cb..8d7b044 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala @@ -433,7 +433,7 @@ class CarbonMergerRDD[K, V]( if (blockletCount != 0) { val taskInfo = splitInfo.asInstanceOf[CarbonInputSplitTaskInfo] - val multiBlockSplit = new CarbonMultiBlockSplit(absoluteTableIdentifier, + val multiBlockSplit = new CarbonMultiBlockSplit( taskInfo.getCarbonInputSplitList, Array(nodeName)) if (isPartitionTable) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/44ffaf57/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala index 1a8943b..5647427 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala @@ -115,7 +115,7 @@ class CarbonScanPartitionRDD(alterPartitionModel: AlterPartitionModel, val splits = blocksPerTask.asScala.map(_.asInstanceOf[CarbonInputSplit]) if (blocksPerTask.size() != 0) { val multiBlockSplit = - new CarbonMultiBlockSplit(absoluteTableIdentifier, splits.asJava, Array(node)) + new CarbonMultiBlockSplit(splits.asJava, Array(node)) val partition = new CarbonSparkPartition(id, partition_num, multiBlockSplit) result.add(partition) partition_num += 1 http://git-wip-us.apache.org/repos/asf/carbondata/blob/44ffaf57/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala index a04e9e1..f2c3060 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala @@ -111,7 +111,7 @@ class CarbonScanRDD( val streamPartitions: mutable.Buffer[Partition] = streamSplits.zipWithIndex.map { splitWithIndex => val multiBlockSplit = - new CarbonMultiBlockSplit(identifier, + new CarbonMultiBlockSplit( Seq(splitWithIndex._1.asInstanceOf[CarbonInputSplit]).asJava, splitWithIndex._1.getLocations, FileFormat.ROW_V1) @@ -156,7 +156,7 @@ class CarbonScanRDD( (0 until bucketedTable.getNumberOfBuckets).map { bucketId => val bucketPartitions = bucketed.getOrElse(bucketId.toString, Nil) val multiBlockSplit = - new CarbonMultiBlockSplit(identifier, + new CarbonMultiBlockSplit( bucketPartitions.asJava, bucketPartitions.flatMap(_.getLocations).toArray) val partition = new CarbonSparkPartition(id, i, multiBlockSplit) @@ -186,7 +186,7 @@ class CarbonScanRDD( val splits = blocksPerTask.asScala.map(_.asInstanceOf[CarbonInputSplit]) if (blocksPerTask.size() != 0) { val multiBlockSplit = - new CarbonMultiBlockSplit(identifier, splits.asJava, Array(node)) + new CarbonMultiBlockSplit(splits.asJava, Array(node)) val partition = new CarbonSparkPartition(id, i, multiBlockSplit) result.add(partition) i += 1 @@ -200,7 +200,7 @@ class CarbonScanRDD( // Randomize the blocklets for better shuffling Random.shuffle(splits.asScala).zipWithIndex.foreach { splitWithIndex => val multiBlockSplit = - new CarbonMultiBlockSplit(identifier, + new CarbonMultiBlockSplit( Seq(splitWithIndex._1.asInstanceOf[CarbonInputSplit]).asJava, splitWithIndex._1.getLocations) val partition = new CarbonSparkPartition(id, splitWithIndex._2, multiBlockSplit) @@ -215,7 +215,7 @@ class CarbonScanRDD( .map(_.asInstanceOf[CarbonInputSplit]) .groupBy(f => f.getBlockPath) .map { blockSplitEntry => - new CarbonMultiBlockSplit(identifier, + new CarbonMultiBlockSplit( blockSplitEntry._2.asJava, blockSplitEntry._2.flatMap(f => f.getLocations).distinct.toArray) }.toArray.sortBy(_.getLength)(implicitly[Ordering[Long]].reverse) @@ -257,7 +257,7 @@ class CarbonScanRDD( f.getSegmentId.concat(f.getBlockPath) }.values.zipWithIndex.foreach { splitWithIndex => val multiBlockSplit = - new CarbonMultiBlockSplit(identifier, + new CarbonMultiBlockSplit( splitWithIndex._1.asJava, splitWithIndex._1.flatMap(f => f.getLocations).distinct.toArray) val partition = new CarbonSparkPartition(id, splitWithIndex._2, multiBlockSplit) @@ -306,7 +306,7 @@ class CarbonScanRDD( .map(_._1) .toArray - val multiBlockSplit = new CarbonMultiBlockSplit(null, carbonInputSplits.asJava, locations) + val multiBlockSplit = new CarbonMultiBlockSplit(carbonInputSplits.asJava, locations) new CarbonSparkPartition(id, partitionId, multiBlockSplit) }
