[CARBONDATA-1363] Add DataMapWriter interface This closes #1238
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/f089287c Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/f089287c Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/f089287c Branch: refs/heads/master Commit: f089287cef1d685b81e8fa26868325503acdb635 Parents: 85cbad2 Author: Jacky Li <[email protected]> Authored: Thu Aug 10 13:36:18 2017 +0800 Committer: Ravindra Pesala <[email protected]> Committed: Mon Aug 14 01:30:40 2017 +0530 ---------------------------------------------------------------------- .../core/datamap/DataMapDistributable.java | 56 +++ .../carbondata/core/datamap/DataMapMeta.java | 42 ++ .../core/datamap/DataMapStoreManager.java | 144 +++++++ .../carbondata/core/datamap/TableDataMap.java | 142 +++++++ .../carbondata/core/datamap/dev/DataMap.java | 57 +++ .../core/datamap/dev/DataMapFactory.java | 73 ++++ .../core/datamap/dev/DataMapWriter.java | 58 +++ .../core/datastore/page/EncodedTablePage.java | 11 - .../indexstore/BlockletDataMapIndexStore.java | 13 +- .../carbondata/core/indexstore/DataMap.java | 60 --- .../core/indexstore/DataMapDistributable.java | 56 --- .../core/indexstore/DataMapFactory.java | 87 ---- .../core/indexstore/DataMapStoreManager.java | 139 ------- .../carbondata/core/indexstore/DataMapType.java | 36 -- .../core/indexstore/DataMapWriter.java | 50 --- .../core/indexstore/TableDataMap.java | 133 ------ .../blockletindex/BlockletDataMap.java | 45 +- .../blockletindex/BlockletDataMapFactory.java | 46 +- .../core/metadata/AbsoluteTableIdentifier.java | 4 + .../core/metadata/CarbonTableIdentifier.java | 6 +- .../apache/carbondata/core/util/CarbonUtil.java | 6 + .../hadoop/api/CarbonTableInputFormat.java | 16 +- .../spark/load/CarbonLoaderUtilTest.java | 417 ------------------- .../validation/FileFooterValidator.java | 155 ------- .../testsuite/datamap/DataMapWriterSuite.scala | 180 ++++++++ .../spark/rdd/DataManagementFunc.scala | 5 +- .../spark/sql/hive/CarbonFileMetastore.scala | 2 +- .../spark/sql/hive/CarbonHiveMetaStore.scala | 2 +- .../iud/DeleteCarbonTableSubqueryTestCase.scala | 5 +- .../core/datastore/GenericDataType.java | 145 ------- .../datamap/DataMapWriterListener.java | 138 ++++++ .../processing/datatypes/ArrayDataType.java | 1 - .../processing/datatypes/GenericDataType.java | 145 +++++++ .../processing/datatypes/PrimitiveDataType.java | 1 - .../processing/datatypes/StructDataType.java | 1 - .../impl/ComplexFieldConverterImpl.java | 2 +- .../converter/impl/FieldEncoderFactory.java | 2 +- .../store/CarbonFactDataHandlerColumnar.java | 131 +++--- .../store/CarbonFactDataHandlerModel.java | 14 +- .../carbondata/processing/store/TablePage.java | 66 ++- .../store/writer/AbstractFactDataWriter.java | 115 +++-- .../store/writer/CarbonDataWriterVo.java | 11 + .../store/writer/CarbonFactDataWriter.java | 5 +- .../writer/v1/CarbonFactDataWriterImplV1.java | 11 +- .../writer/v2/CarbonFactDataWriterImplV2.java | 13 +- .../store/writer/v3/BlockletDataHolder.java | 72 ++++ .../writer/v3/CarbonFactDataWriterImplV3.java | 131 +++--- .../store/writer/v3/DataWriterHolder.java | 62 --- .../util/CarbonDataProcessorUtil.java | 2 +- 49 files changed, 1502 insertions(+), 1612 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/datamap/DataMapDistributable.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapDistributable.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapDistributable.java new file mode 100644 index 0000000..517f629 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapDistributable.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.datamap; + +import org.apache.carbondata.core.datastore.block.Distributable; + +/** + * Distributable class for datamap. + */ +public abstract class DataMapDistributable implements Distributable { + + private String tablePath; + + private String segmentId; + + private String dataMapName; + + public String getTablePath() { + return tablePath; + } + + public void setTablePath(String tablePath) { + this.tablePath = tablePath; + } + + public String getSegmentId() { + return segmentId; + } + + public void setSegmentId(String segmentId) { + this.segmentId = segmentId; + } + + public String getDataMapName() { + return dataMapName; + } + + public void setDataMapName(String dataMapName) { + this.dataMapName = dataMapName; + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java new file mode 100644 index 0000000..7746acf --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.datamap; + +import java.util.List; + +import org.apache.carbondata.core.indexstore.schema.FilterType; + +public class DataMapMeta { + + private List<String> indexedColumns; + + private FilterType optimizedOperation; + + public DataMapMeta(List<String> indexedColumns, FilterType optimizedOperation) { + this.indexedColumns = indexedColumns; + this.optimizedOperation = optimizedOperation; + } + + public List<String> getIndexedColumns() { + return indexedColumns; + } + + public FilterType getOptimizedOperation() { + return optimizedOperation; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java new file mode 100644 index 0000000..f5bc22f --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.datamap; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datamap.dev.DataMapFactory; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; + +/** + * It maintains all the DataMaps in it. + */ +public final class DataMapStoreManager { + + private static DataMapStoreManager instance = new DataMapStoreManager(); + + /** + * Contains the list of datamaps for each table. + */ + private Map<String, List<TableDataMap>> allDataMaps = new ConcurrentHashMap<>(); + + private static final LogService LOGGER = + LogServiceFactory.getLogService(DataMapStoreManager.class.getName()); + + private DataMapStoreManager() { + + } + + public List<TableDataMap> getAllDataMap(AbsoluteTableIdentifier identifier) { + return allDataMaps.get(identifier.uniqueName()); + } + + /** + * Get the datamap for reading data. + * + * @param dataMapName + * @param factoryClass + * @return + */ + public TableDataMap getDataMap(AbsoluteTableIdentifier identifier, String dataMapName, + Class<? extends DataMapFactory> factoryClass) { + String table = identifier.uniqueName(); + List<TableDataMap> tableDataMaps = allDataMaps.get(table); + TableDataMap dataMap; + if (tableDataMaps == null) { + dataMap = createAndRegisterDataMap(identifier, factoryClass, dataMapName); + } else { + dataMap = getAbstractTableDataMap(dataMapName, tableDataMaps); + } + if (dataMap == null) { + throw new RuntimeException("Datamap does not exist"); + } + return dataMap; + } + + /** + * Return a new datamap instance and registered in the store manager. + * The datamap is created using datamap name, datamap factory class and table identifier. + */ + public TableDataMap createAndRegisterDataMap(AbsoluteTableIdentifier identifier, + Class<? extends DataMapFactory> factoryClass, String dataMapName) { + String table = identifier.uniqueName(); + List<TableDataMap> tableDataMaps = allDataMaps.get(table); + if (tableDataMaps == null) { + tableDataMaps = new ArrayList<>(); + allDataMaps.put(table, tableDataMaps); + } + TableDataMap dataMap = getAbstractTableDataMap(dataMapName, tableDataMaps); + if (dataMap != null) { + throw new RuntimeException("Already datamap exists in that path with type " + dataMapName); + } + + try { + DataMapFactory dataMapFactory = factoryClass.newInstance(); + dataMapFactory.init(identifier, dataMapName); + dataMap = new TableDataMap(identifier, dataMapName, dataMapFactory); + } catch (Exception e) { + LOGGER.error(e); + throw new RuntimeException(e); + } + tableDataMaps.add(dataMap); + return dataMap; + } + + private TableDataMap getAbstractTableDataMap(String dataMapName, + List<TableDataMap> tableDataMaps) { + TableDataMap dataMap = null; + for (TableDataMap tableDataMap: tableDataMaps) { + if (tableDataMap.getDataMapName().equals(dataMapName)) { + dataMap = tableDataMap; + break; + } + } + return dataMap; + } + + /** + * Clear the datamap/datamaps of a mentioned datamap name and table from memory + * @param identifier + * @param dataMapName + */ + public void clearDataMap(AbsoluteTableIdentifier identifier, String dataMapName) { + List<TableDataMap> tableDataMaps = allDataMaps.get(identifier); + if (tableDataMaps != null) { + int i = 0; + for (TableDataMap tableDataMap: tableDataMaps) { + if (tableDataMap != null && dataMapName.equals(tableDataMap.getDataMapName())) { + tableDataMap.clear(); + tableDataMaps.remove(i); + break; + } + i++; + } + } + } + + /** + * Returns the singleton instance + * @return + */ + public static DataMapStoreManager getInstance() { + return instance; + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java new file mode 100644 index 0000000..b55c5d9 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.datamap; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.carbondata.core.datamap.dev.DataMap; +import org.apache.carbondata.core.datamap.dev.DataMapFactory; +import org.apache.carbondata.core.events.ChangeEvent; +import org.apache.carbondata.core.events.EventListener; +import org.apache.carbondata.core.indexstore.Blocklet; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; + +/** + * DataMap at the table level, user can add any number of datamaps for one table. Depends + * on the filter condition it can prune the blocklets. + */ +public final class TableDataMap implements EventListener { + + private AbsoluteTableIdentifier identifier; + + private String dataMapName; + + private DataMapFactory dataMapFactory; + + /** + * It is called to initialize and load the required table datamap metadata. + */ + public TableDataMap(AbsoluteTableIdentifier identifier, String dataMapName, + DataMapFactory dataMapFactory) { + this.identifier = identifier; + this.dataMapName = dataMapName; + this.dataMapFactory = dataMapFactory; + } + + /** + * Pass the valid segments and prune the datamap using filter expression + * + * @param segmentIds + * @param filterExp + * @return + */ + public List<Blocklet> prune(List<String> segmentIds, FilterResolverIntf filterExp) + throws IOException { + List<Blocklet> blocklets = new ArrayList<>(); + for (String segmentId : segmentIds) { + List<DataMap> dataMaps = dataMapFactory.getDataMaps(segmentId); + for (DataMap dataMap : dataMaps) { + List<Blocklet> pruneBlocklets = dataMap.prune(filterExp); + blocklets.addAll(addSegmentId(pruneBlocklets, segmentId)); + } + } + return blocklets; + } + + private List<Blocklet> addSegmentId(List<Blocklet> pruneBlocklets, String segmentId) { + for (Blocklet blocklet : pruneBlocklets) { + blocklet.setSegmentId(segmentId); + } + return pruneBlocklets; + } + + /** + * This is used for making the datamap distributable. + * It takes the valid segments and returns all the datamaps as distributable objects so that + * it can be distributed across machines. + * + * @return + */ + public List<DataMapDistributable> toDistributable(List<String> segmentIds) throws IOException { + List<DataMapDistributable> distributables = new ArrayList<>(); + for (String segmentsId : segmentIds) { + List<DataMap> dataMaps = dataMapFactory.getDataMaps(segmentsId); + for (DataMap dataMap : dataMaps) { + distributables.add(dataMap.toDistributable()); + } + } + return distributables; + } + + /** + * This method is used from any machine after it is distributed. It takes the distributable object + * to prune the filters. + * + * @param distributable + * @param filterExp + * @return + */ + public List<Blocklet> prune(DataMapDistributable distributable, FilterResolverIntf filterExp) { + return dataMapFactory.getDataMap(distributable).prune(filterExp); + } + + @Override public void fireEvent(ChangeEvent event) { + dataMapFactory.fireEvent(event); + } + + /** + * Clear only the datamaps of the segments + * @param segmentIds + */ + public void clear(List<String> segmentIds) { + for (String segmentId: segmentIds) { + dataMapFactory.clear(segmentId); + } + } + + /** + * Clears all datamap + */ + public void clear() { + dataMapFactory.clear(); + } + /** + * Get the unique name of datamap + * + * @return + */ + public String getDataMapName() { + return dataMapName; + } + + public DataMapFactory getDataMapFactory() { + return dataMapFactory; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java new file mode 100644 index 0000000..526572a --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.datamap.dev; + +import java.io.IOException; +import java.util.List; + +import org.apache.carbondata.core.datamap.DataMapDistributable; +import org.apache.carbondata.core.indexstore.Blocklet; +import org.apache.carbondata.core.memory.MemoryException; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; + +/** + * Datamap is an entity which can store and retrieve index data. + */ +public interface DataMap { + + /** + * It is called to load the data map to memory or to initialize it. + */ + void init(String path) throws MemoryException, IOException; + + /** + * Prune the datamap with filter expression. It returns the list of + * blocklets where these filters can exist. + * + * @param filterExp + * @return + */ + List<Blocklet> prune(FilterResolverIntf filterExp); + + /** + * Convert datamap to distributable object + * @return + */ + DataMapDistributable toDistributable(); + + /** + * Clear complete index table and release memory. + */ + void clear(); + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java new file mode 100644 index 0000000..873457c --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.datamap.dev; + +import java.io.IOException; +import java.util.List; + +import org.apache.carbondata.core.datamap.DataMapDistributable; +import org.apache.carbondata.core.datamap.DataMapMeta; +import org.apache.carbondata.core.datamap.dev.DataMap; +import org.apache.carbondata.core.events.ChangeEvent; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; + +/** + * Interface for datamap factory, it is responsible for creating the datamap. + */ +public interface DataMapFactory { + + /** + * Initialization of Datamap factory with the identifier and datamap name + */ + void init(AbsoluteTableIdentifier identifier, String dataMapName); + + /** + * Return a new write for this datamap + */ + DataMapWriter createWriter(String segmentId); + + /** + * Get the datamap for segmentid + */ + List<DataMap> getDataMaps(String segmentId) throws IOException; + + /** + * Get datamap for distributable object. + */ + DataMap getDataMap(DataMapDistributable distributable); + + /** + * + * @param event + */ + void fireEvent(ChangeEvent event); + + /** + * Clears datamap of the segment + */ + void clear(String segmentId); + + /** + * Clear all datamaps from memory + */ + void clear(); + + /** + * Return metadata of this datamap + */ + DataMapMeta getMeta(); +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java new file mode 100644 index 0000000..28163d7 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.datamap.dev; + +import org.apache.carbondata.core.datastore.page.ColumnPage; + +/** + * Data Map writer + */ +public interface DataMapWriter { + + /** + * Start of new block notification. + * @param blockId file name of the carbondata file + */ + void onBlockStart(String blockId); + + /** + * End of block notification + */ + void onBlockEnd(String blockId); + + /** + * Start of new blocklet notification. + * @param blockletId sequence number of blocklet in the block + */ + void onBlockletStart(int blockletId); + + /** + * End of blocklet notification + * @param blockletId sequence number of blocklet in the block + */ + void onBlockletEnd(int blockletId); + + /** + * Add the column pages row to the datamap, order of pages is same as `indexColumns` in + * DataMapMeta returned in DataMapFactory. + * + * Implementation should copy the content of `pages` as needed, because `pages` memory + * may be freed after this method returns, if using unsafe column page. + */ + void onPageAdded(int blockletId, int pageId, ColumnPage[] pages); + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/datastore/page/EncodedTablePage.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/EncodedTablePage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/EncodedTablePage.java index ea9c373..0aac1d9 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/EncodedTablePage.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/EncodedTablePage.java @@ -42,9 +42,6 @@ public class EncodedTablePage { // number of row in this page private int pageSize; - // true if it is last page of all input rows - private boolean isLastPage; - // size in bytes of all encoded columns (including data and metadate) private int encodedSize; @@ -128,14 +125,6 @@ public class EncodedTablePage { return pageKey; } - public boolean isLastPage() { - return isLastPage; - } - - public void setIsLastPage(boolean isWriteAll) { - this.isLastPage = isWriteAll; - } - public EncodedMeasurePage getMeasure(int measureIndex) { return measures[measureIndex]; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java index fc8c273..9d4af7b 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDataMapIndexStore.java @@ -26,8 +26,8 @@ import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.cache.Cache; import org.apache.carbondata.core.cache.CarbonLRUCache; -import org.apache.carbondata.core.datastore.exception.IndexBuilderException; import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap; +import org.apache.carbondata.core.memory.MemoryException; /** * Class to handle loading, unloading,clearing,storing of the table @@ -73,10 +73,9 @@ public class BlockletDataMapIndexStore if (dataMap == null) { try { dataMap = loadAndGetDataMap(tableSegmentUniqueIdentifier); - } catch (IndexBuilderException e) { - throw new IOException(e.getMessage(), e); - } catch (Throwable e) { - throw new IOException("Problem in loading segment block.", e); + } catch (MemoryException e) { + LOGGER.error("memory exception when loading datamap: " + e.getMessage()); + throw new RuntimeException(e.getMessage(), e); } } return dataMap; @@ -93,6 +92,7 @@ public class BlockletDataMapIndexStore for (BlockletDataMap dataMap : blockletDataMaps) { dataMap.clear(); } + e.printStackTrace(); throw new IOException("Problem in loading segment blocks.", e); } return blockletDataMaps; @@ -130,7 +130,8 @@ public class BlockletDataMapIndexStore * @throws IOException */ private BlockletDataMap loadAndGetDataMap( - TableBlockIndexUniqueIdentifier tableSegmentUniqueIdentifier) throws IOException { + TableBlockIndexUniqueIdentifier tableSegmentUniqueIdentifier) + throws IOException, MemoryException { String uniqueTableSegmentIdentifier = tableSegmentUniqueIdentifier.getUniqueTableSegmentIdentifier(); Object lock = segmentLockMap.get(uniqueTableSegmentIdentifier); http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/indexstore/DataMap.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMap.java deleted file mode 100644 index 1276494..0000000 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMap.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.carbondata.core.indexstore; - -import java.util.List; - -import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; - -/** - * Datamap is an entity which can store and retrieve index data. - */ -public interface DataMap { - - /** - * Give the writer to write the data. - * - * @return - */ - DataMapWriter getWriter(); - - /** - * It is called to load the data map to memory or to initialize it. - */ - void init(String path); - - /** - * Prune the datamap with filter expression. It returns the list of - * blocklets where these filters can exist. - * - * @param filterExp - * @return - */ - List<Blocklet> prune(FilterResolverIntf filterExp); - - /** - * Convert datamap to distributable object - * @return - */ - DataMapDistributable toDistributable(); - - /** - * Clear complete index table and release memory. - */ - void clear(); - -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapDistributable.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapDistributable.java b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapDistributable.java deleted file mode 100644 index 4c379f3..0000000 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapDistributable.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.carbondata.core.indexstore; - -import org.apache.carbondata.core.datastore.block.Distributable; - -/** - * Distributable class for datamap. - */ -public abstract class DataMapDistributable implements Distributable { - - private String tablePath; - - private String segmentId; - - private String dataMapName; - - public String getTablePath() { - return tablePath; - } - - public void setTablePath(String tablePath) { - this.tablePath = tablePath; - } - - public String getSegmentId() { - return segmentId; - } - - public void setSegmentId(String segmentId) { - this.segmentId = segmentId; - } - - public String getDataMapName() { - return dataMapName; - } - - public void setDataMapName(String dataMapName) { - this.dataMapName = dataMapName; - } - -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapFactory.java deleted file mode 100644 index 72f714f..0000000 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapFactory.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.carbondata.core.indexstore; - -import java.util.List; - -import org.apache.carbondata.core.events.ChangeEvent; -import org.apache.carbondata.core.indexstore.schema.FilterType; -import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; - -/** - * Interface for datamap factory, it is responsible for creating the datamap. - */ -public interface DataMapFactory { - - /** - * Initialization of Datamap factory - * @param identifier - * @param dataMapName - */ - void init(AbsoluteTableIdentifier identifier, String dataMapName); - /** - * Get the datamap writer for each segmentid. - * - * @param identifier - * @param segmentId - * @return - */ - DataMapWriter getDataMapWriter(AbsoluteTableIdentifier identifier, - String segmentId); - - /** - * Get the datamap for segmentid - * - * @param segmentId - * @return - */ - List<DataMap> getDataMaps(String segmentId); - - /** - * Get datamap for distributable object. - * - * @param distributable - * @return - */ - DataMap getDataMap(DataMapDistributable distributable); - - /** - * This method checks whether the columns and the type of filters supported - * for this datamap or not - * - * @param filterType - * @return - */ - boolean isFiltersSupported(FilterType filterType); - - /** - * - * @param event - */ - void fireEvent(ChangeEvent event); - - /** - * Clears datamap of the segment - */ - void clear(String segmentId); - - /** - * Clear all datamaps from memory - */ - void clear(); - -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapStoreManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapStoreManager.java deleted file mode 100644 index 1664a6a..0000000 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapStoreManager.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.carbondata.core.indexstore; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; - -/** - * It maintains all the DataMaps in it. - */ -public final class DataMapStoreManager { - - private static DataMapStoreManager instance = new DataMapStoreManager(); - - /** - * Contains the list of datamaps for each table. - */ - private Map<AbsoluteTableIdentifier, List<TableDataMap>> dataMapMappping = new HashMap<>(); - - private static final LogService LOGGER = - LogServiceFactory.getLogService(DataMapStoreManager.class.getName()); - - private DataMapStoreManager() { - - } - - /** - * Get the datamap for reading data. - * - * @param dataMapName - * @param mapType - * @return - */ - public TableDataMap getDataMap(AbsoluteTableIdentifier identifier, String dataMapName, - DataMapType mapType) { - List<TableDataMap> tableDataMaps = dataMapMappping.get(identifier); - TableDataMap dataMap; - if (tableDataMaps == null) { - createTableDataMap(identifier, mapType, dataMapName); - tableDataMaps = dataMapMappping.get(identifier); - } - dataMap = getAbstractTableDataMap(dataMapName, tableDataMaps); - if (dataMap == null) { - throw new RuntimeException("Datamap does not exist"); - } - return dataMap; - } - - /** - * Create new datamap instance using datamap name, datamap type and table identifier - * - * @param mapType - * @return - */ - private TableDataMap createTableDataMap(AbsoluteTableIdentifier identifier, - DataMapType mapType, String dataMapName) { - List<TableDataMap> tableDataMaps = dataMapMappping.get(identifier); - if (tableDataMaps == null) { - tableDataMaps = new ArrayList<>(); - dataMapMappping.put(identifier, tableDataMaps); - } - TableDataMap dataMap = getAbstractTableDataMap(dataMapName, tableDataMaps); - if (dataMap != null) { - throw new RuntimeException("Already datamap exists in that path with type " + mapType); - } - - try { - DataMapFactory dataMapFactory = mapType.getClassObject().newInstance(); - dataMapFactory.init(identifier, dataMapName); - dataMap = new TableDataMap(identifier, dataMapName, dataMapFactory); - } catch (Exception e) { - LOGGER.error(e); - throw new RuntimeException(e); - } - tableDataMaps.add(dataMap); - return dataMap; - } - - private TableDataMap getAbstractTableDataMap(String dataMapName, - List<TableDataMap> tableDataMaps) { - TableDataMap dataMap = null; - for (TableDataMap tableDataMap: tableDataMaps) { - if (tableDataMap.getDataMapName().equals(dataMapName)) { - dataMap = tableDataMap; - break; - } - } - return dataMap; - } - - /** - * Clear the datamap/datamaps of a mentioned datamap name and table from memory - * @param identifier - * @param dataMapName - */ - public void clearDataMap(AbsoluteTableIdentifier identifier, String dataMapName) { - List<TableDataMap> tableDataMaps = dataMapMappping.get(identifier); - if (tableDataMaps != null) { - int i = 0; - for (TableDataMap tableDataMap: tableDataMaps) { - if (tableDataMap != null && dataMapName.equals(tableDataMap.getDataMapName())) { - tableDataMap.clear(); - tableDataMaps.remove(i); - break; - } - i++; - } - } - } - - /** - * Returns the singleton instance - * @return - */ - public static DataMapStoreManager getInstance() { - return instance; - } - -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapType.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapType.java b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapType.java deleted file mode 100644 index 0059b29..0000000 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapType.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.carbondata.core.indexstore; - -import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory; - -/** - * Datamap type - */ -public enum DataMapType { - BLOCKLET(BlockletDataMapFactory.class); - - private Class<? extends DataMapFactory> classObject; - - DataMapType(Class<? extends DataMapFactory> classObject) { - this.classObject = classObject; - } - - public Class<? extends DataMapFactory> getClassObject() { - return classObject; - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapWriter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapWriter.java b/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapWriter.java deleted file mode 100644 index bd8be09..0000000 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/DataMapWriter.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.carbondata.core.indexstore; - -import java.io.DataOutput; - -/** - * Data Map writer - */ -public interface DataMapWriter<T> { - - /** - * Initialize the data map writer with output stream - * - * @param outStream - */ - void init(DataOutput outStream); - - /** - * Add the index row to the in-memory store. - */ - void writeData(T data); - - /** - * Get the added row count - * - * @return - */ - int getRowCount(); - - /** - * Finish writing of data map table, otherwise it will not be allowed to read. - */ - void finish(); - -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/indexstore/TableDataMap.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/TableDataMap.java deleted file mode 100644 index 39ca4c5..0000000 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/TableDataMap.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.carbondata.core.indexstore; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.carbondata.core.events.ChangeEvent; -import org.apache.carbondata.core.events.EventListener; -import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; -import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; -/** - * DataMap at the table level, user can add any number of datamaps for one table. Depends - * on the filter condition it can prune the blocklets. - */ -public final class TableDataMap implements EventListener { - - private AbsoluteTableIdentifier identifier; - - private String dataMapName; - - private DataMapFactory dataMapFactory; - - /** - * It is called to initialize and load the required table datamap metadata. - */ - public TableDataMap(AbsoluteTableIdentifier identifier, String dataMapName, - DataMapFactory dataMapFactory) { - this.identifier = identifier; - this.dataMapName = dataMapName; - this.dataMapFactory = dataMapFactory; - } - - /** - * Pass the valid segments and prune the datamap using filter expression - * - * @param segmentIds - * @param filterExp - * @return - */ - public List<Blocklet> prune(List<String> segmentIds, FilterResolverIntf filterExp) { - List<Blocklet> blocklets = new ArrayList<>(); - for (String segmentId : segmentIds) { - List<DataMap> dataMaps = dataMapFactory.getDataMaps(segmentId); - for (DataMap dataMap : dataMaps) { - List<Blocklet> pruneBlocklets = dataMap.prune(filterExp); - blocklets.addAll(addSegmentId(pruneBlocklets, segmentId)); - } - } - return blocklets; - } - - private List<Blocklet> addSegmentId(List<Blocklet> pruneBlocklets, String segmentId) { - for (Blocklet blocklet : pruneBlocklets) { - blocklet.setSegmentId(segmentId); - } - return pruneBlocklets; - } - - /** - * This is used for making the datamap distributable. - * It takes the valid segments and returns all the datamaps as distributable objects so that - * it can be distributed across machines. - * - * @return - */ - public List<DataMapDistributable> toDistributable(List<String> segmentIds) { - List<DataMapDistributable> distributables = new ArrayList<>(); - for (String segmentsId : segmentIds) { - List<DataMap> dataMaps = dataMapFactory.getDataMaps(segmentsId); - for (DataMap dataMap : dataMaps) { - distributables.add(dataMap.toDistributable()); - } - } - return distributables; - } - - /** - * This method is used from any machine after it is distributed. It takes the distributable object - * to prune the filters. - * - * @param distributable - * @param filterExp - * @return - */ - public List<Blocklet> prune(DataMapDistributable distributable, FilterResolverIntf filterExp) { - return dataMapFactory.getDataMap(distributable).prune(filterExp); - } - - @Override public void fireEvent(ChangeEvent event) { - dataMapFactory.fireEvent(event); - } - - /** - * Clear only the datamaps of the segments - * @param segmentIds - */ - public void clear(List<String> segmentIds) { - for (String segmentId: segmentIds) { - dataMapFactory.clear(segmentId); - } - } - - /** - * Clears all datamap - */ - public void clear() { - dataMapFactory.clear(); - } - /** - * Get the unique name of datamap - * - * @return - */ - public String getDataMapName() { - return dataMapName; - } - -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java index 4b5be11..2e82c46 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java @@ -31,14 +31,13 @@ import java.util.List; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.cache.Cacheable; +import org.apache.carbondata.core.datamap.DataMapDistributable; +import org.apache.carbondata.core.datamap.dev.DataMap; import org.apache.carbondata.core.datastore.IndexKey; import org.apache.carbondata.core.datastore.block.SegmentProperties; import org.apache.carbondata.core.datastore.block.TableBlockInfo; import org.apache.carbondata.core.indexstore.Blocklet; import org.apache.carbondata.core.indexstore.BlockletDetailInfo; -import org.apache.carbondata.core.indexstore.DataMap; -import org.apache.carbondata.core.indexstore.DataMapDistributable; -import org.apache.carbondata.core.indexstore.DataMapWriter; import org.apache.carbondata.core.indexstore.UnsafeMemoryDMStore; import org.apache.carbondata.core.indexstore.row.DataMapRow; import org.apache.carbondata.core.indexstore.row.DataMapRowImpl; @@ -64,6 +63,8 @@ public class BlockletDataMap implements DataMap, Cacheable { private static final LogService LOGGER = LogServiceFactory.getLogService(BlockletDataMap.class.getName()); + public static final String NAME = "clustered.btree.blocklet"; + private static int KEY_INDEX = 0; private static int MIN_VALUES_INDEX = 1; @@ -88,31 +89,23 @@ public class BlockletDataMap implements DataMap, Cacheable { private int[] columnCardinality; - @Override public DataMapWriter getWriter() { - return null; - } - - @Override public void init(String path) { + @Override public void init(String path) throws IOException, MemoryException { DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter(); - try { - List<DataFileFooter> indexInfo = fileFooterConverter.getIndexInfo(path); - for (DataFileFooter fileFooter : indexInfo) { - List<ColumnSchema> columnInTable = fileFooter.getColumnInTable(); - if (segmentProperties == null) { - columnCardinality = fileFooter.getSegmentInfo().getColumnCardinality(); - segmentProperties = new SegmentProperties(columnInTable, columnCardinality); - createSchema(segmentProperties); - } - TableBlockInfo blockInfo = fileFooter.getBlockInfo().getTableBlockInfo(); - fileFooter = CarbonUtil.readMetadatFile(blockInfo); - - loadToUnsafe(fileFooter, segmentProperties, blockInfo.getFilePath()); - } - if (unsafeMemoryDMStore != null) { - unsafeMemoryDMStore.finishWriting(); + List<DataFileFooter> indexInfo = fileFooterConverter.getIndexInfo(path); + for (DataFileFooter fileFooter : indexInfo) { + List<ColumnSchema> columnInTable = fileFooter.getColumnInTable(); + if (segmentProperties == null) { + columnCardinality = fileFooter.getSegmentInfo().getColumnCardinality(); + segmentProperties = new SegmentProperties(columnInTable, columnCardinality); + createSchema(segmentProperties); } - } catch (Exception e) { - throw new RuntimeException(e); + TableBlockInfo blockInfo = fileFooter.getBlockInfo().getTableBlockInfo(); + fileFooter = CarbonUtil.readMetadatFile(blockInfo); + + loadToUnsafe(fileFooter, segmentProperties, blockInfo.getFilePath()); + } + if (unsafeMemoryDMStore != null) { + unsafeMemoryDMStore.finishWriting(); } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java index 2fe6643..e189931 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java @@ -25,16 +25,16 @@ import java.util.Map; import org.apache.carbondata.core.cache.Cache; import org.apache.carbondata.core.cache.CacheProvider; import org.apache.carbondata.core.cache.CacheType; +import org.apache.carbondata.core.datamap.DataMapDistributable; +import org.apache.carbondata.core.datamap.DataMapMeta; +import org.apache.carbondata.core.datamap.dev.DataMap; +import org.apache.carbondata.core.datamap.dev.DataMapFactory; +import org.apache.carbondata.core.datamap.dev.DataMapWriter; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.events.ChangeEvent; -import org.apache.carbondata.core.indexstore.DataMap; -import org.apache.carbondata.core.indexstore.DataMapDistributable; -import org.apache.carbondata.core.indexstore.DataMapFactory; -import org.apache.carbondata.core.indexstore.DataMapWriter; import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier; -import org.apache.carbondata.core.indexstore.schema.FilterType; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; /** @@ -44,21 +44,25 @@ public class BlockletDataMapFactory implements DataMapFactory { private AbsoluteTableIdentifier identifier; + // segmentId -> list of index file private Map<String, List<TableBlockIndexUniqueIdentifier>> segmentMap = new HashMap<>(); private Cache<TableBlockIndexUniqueIdentifier, DataMap> cache; + @Override public void init(AbsoluteTableIdentifier identifier, String dataMapName) { this.identifier = identifier; cache = CacheProvider.getInstance() .createCache(CacheType.DRIVER_BLOCKLET_DATAMAP, identifier.getStorePath()); } - public DataMapWriter getDataMapWriter(AbsoluteTableIdentifier identifier, String segmentId) { - return null; + @Override + public DataMapWriter createWriter(String segmentId) { + throw new UnsupportedOperationException("not implemented"); } - public List<DataMap> getDataMaps(String segmentId) { + @Override + public List<DataMap> getDataMaps(String segmentId) throws IOException { List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers = segmentMap.get(segmentId); if (tableBlockIndexUniqueIdentifiers == null) { @@ -77,17 +81,10 @@ public class BlockletDataMapFactory implements DataMapFactory { } } - try { - return cache.getAll(tableBlockIndexUniqueIdentifiers); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override public boolean isFiltersSupported(FilterType filterType) { - return true; + return cache.getAll(tableBlockIndexUniqueIdentifiers); } + @Override public void clear(String segmentId) { List<TableBlockIndexUniqueIdentifier> blockIndexes = segmentMap.remove(segmentId); if (blockIndexes != null) { @@ -99,17 +96,26 @@ public class BlockletDataMapFactory implements DataMapFactory { } } - @Override public void clear() { + @Override + public void clear() { for (String segmentId: segmentMap.keySet()) { clear(segmentId); } } - @Override public DataMap getDataMap(DataMapDistributable distributable) { + @Override + public DataMap getDataMap(DataMapDistributable distributable) { return null; } - @Override public void fireEvent(ChangeEvent event) { + @Override + public void fireEvent(ChangeEvent event) { } + + @Override + public DataMapMeta getMeta() { + // TODO: pass SORT_COLUMNS into this class + return null; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java b/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java index 22faaf2..31ad03b 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java @@ -156,4 +156,8 @@ public class AbsoluteTableIdentifier implements Serializable { } return true; } + + public String uniqueName() { + return storePath + "/" + carbonTableIdentifier.toString().toLowerCase(); + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/metadata/CarbonTableIdentifier.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/CarbonTableIdentifier.java b/core/src/main/java/org/apache/carbondata/core/metadata/CarbonTableIdentifier.java index 31a0b23..cc65d9b 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/CarbonTableIdentifier.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/CarbonTableIdentifier.java @@ -128,9 +128,9 @@ public class CarbonTableIdentifier implements Serializable { return true; } - /* - * @return table unidque name - */ + /** + * return unique table name + */ @Override public String toString() { return databaseName + '_' + tableName; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java index edc4c28..15512a8 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java @@ -1950,5 +1950,11 @@ public final class CarbonUtil { throw new IllegalArgumentException("Invalid data type: " + meta.getType()); } } + + public static void requireNotNull(Object obj) { + if (obj == null) { + throw new IllegalArgumentException("parameter not be null"); + } + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java index 54ad18b..19e264b 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java @@ -30,11 +30,12 @@ import java.util.List; import java.util.Map; import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datamap.DataMapStoreManager; +import org.apache.carbondata.core.datamap.TableDataMap; import org.apache.carbondata.core.datastore.TableSegmentUniqueIdentifier; import org.apache.carbondata.core.indexstore.Blocklet; -import org.apache.carbondata.core.indexstore.DataMapStoreManager; -import org.apache.carbondata.core.indexstore.DataMapType; -import org.apache.carbondata.core.indexstore.TableDataMap; +import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap; +import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory; import org.apache.carbondata.core.keygenerator.KeyGenException; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.ColumnarFormatVersion; @@ -246,7 +247,8 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { @Override public List<InputSplit> getSplits(JobContext job) throws IOException { AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(job.getConfiguration()); TableDataMap blockletMap = - DataMapStoreManager.getInstance().getDataMap(identifier, "blocklet", DataMapType.BLOCKLET); + DataMapStoreManager.getInstance().getDataMap(identifier, BlockletDataMap.NAME, + BlockletDataMapFactory.class); List<String> invalidSegments = new ArrayList<>(); List<UpdateVO> invalidTimestampsList = new ArrayList<>(); List<String> validSegments = Arrays.asList(getSegmentsToAccess(job)); @@ -403,7 +405,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { new Path[] { new Path(absoluteTableIdentifier.getTablePath()) }, job.getConfiguration()); TableDataMap blockletMap = DataMapStoreManager.getInstance() - .getDataMap(absoluteTableIdentifier, "blocklet", DataMapType.BLOCKLET); + .getDataMap(absoluteTableIdentifier, BlockletDataMap.NAME, BlockletDataMapFactory.class); List<Blocklet> prunedBlocklets = blockletMap.prune(segmentIds, resolver); List<org.apache.carbondata.hadoop.CarbonInputSplit> resultFilterredBlocks = new ArrayList<>(); @@ -549,8 +551,8 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { */ public BlockMappingVO getBlockRowCount(JobContext job, AbsoluteTableIdentifier identifier) throws IOException, KeyGenException { - TableDataMap blockletMap = - DataMapStoreManager.getInstance().getDataMap(identifier, "blocklet", DataMapType.BLOCKLET); + TableDataMap blockletMap = DataMapStoreManager.getInstance() + .getDataMap(identifier, BlockletDataMap.NAME, BlockletDataMapFactory.class); SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(identifier); SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegments = new SegmentStatusManager(identifier).getValidAndInvalidSegments(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/integration/spark-common-test/src/test/java/org/carbondata/integration/spark/load/CarbonLoaderUtilTest.java ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/java/org/carbondata/integration/spark/load/CarbonLoaderUtilTest.java b/integration/spark-common-test/src/test/java/org/carbondata/integration/spark/load/CarbonLoaderUtilTest.java deleted file mode 100644 index 76c7e6f..0000000 --- a/integration/spark-common-test/src/test/java/org/carbondata/integration/spark/load/CarbonLoaderUtilTest.java +++ /dev/null @@ -1,417 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.carbondata.integration.spark.load; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - -import org.apache.carbondata.core.datastore.block.Distributable; -import org.apache.carbondata.core.datastore.block.TableBlockInfo; -import org.apache.carbondata.spark.load.CarbonLoaderUtil; -import org.junit.Assert; -import org.junit.Test; - -/** - * Test class to test block distribution functionality - */ -public class CarbonLoaderUtilTest { - List<Distributable> blockInfos = null; - int noOfNodesInput = -1; - List<String> activeNode = null; - Map<String, List<Distributable>> expected = null; - Map<String, List<Distributable>> mapOfNodes = null; - - @Test public void nodeBlockMapping() throws Exception { - - // scenario when the 3 nodes and 3 executors - initSet1(); - Map<String, List<Distributable>> mapOfNodes = - CarbonLoaderUtil.nodeBlockMapping(blockInfos, noOfNodesInput, activeNode); - // node allocation - Assert.assertTrue("Node Allocation", expected.size() == mapOfNodes.size()); - // block allocation - boolean isEqual = compareResult(expected, mapOfNodes); - Assert.assertTrue("Block Allocation", isEqual); - - // 2 node and 3 executors - initSet2(); - mapOfNodes = CarbonLoaderUtil.nodeBlockMapping(blockInfos, noOfNodesInput, activeNode); - // node allocation - Assert.assertTrue("Node Allocation", expected.size() == mapOfNodes.size()); - // block allocation - isEqual = compareResult(expected, mapOfNodes); - Assert.assertTrue("Block Allocation", isEqual); - - // 3 data node and 2 executors - initSet3(); - mapOfNodes = CarbonLoaderUtil.nodeBlockMapping(blockInfos, noOfNodesInput, activeNode); - // node allocation - Assert.assertTrue("Node Allocation", expected.size() == mapOfNodes.size()); - // block allocation - isEqual = compareResult(expected, mapOfNodes); - Assert.assertTrue("Block Allocation", isEqual); - } - - /** - * compares the blocks allocation - * - * @param expectedResult - * @param actualResult - * @return - */ - private boolean compareResult(Map<String, List<Distributable>> expectedResult, - Map<String, List<Distributable>> actualResult) { - expectedResult = sortByListSize(expectedResult); - actualResult = sortByListSize(actualResult); - List<List<Distributable>> expectedList = new LinkedList(expectedResult.entrySet()); - List<List<Distributable>> mapOfNodesList = new LinkedList(actualResult.entrySet()); - boolean isEqual = expectedList.size() == mapOfNodesList.size(); - if (isEqual) { - for (int i = 0; i < expectedList.size(); i++) { - int size1 = ((List) ((Map.Entry) (expectedList.get(i))).getValue()).size(); - int size2 = ((List) ((Map.Entry) (mapOfNodesList.get(i))).getValue()).size(); - isEqual = size1 == size2; - if (!isEqual) { - break; - } - } - } - return isEqual; - } - - /** - * sort by list size - * - * @param map - * @return - */ - private static Map<String, List<Distributable>> sortByListSize( - Map<String, List<Distributable>> map) { - List<List<Distributable>> list = new LinkedList(map.entrySet()); - Collections.sort(list, new Comparator() { - public int compare(Object obj1, Object obj2) { - if (obj1 == null && obj2 == null) { - return 0; - } else if (obj1 == null) { - return 1; - } else if (obj2 == null) { - return -1; - } - int size1 = ((List) ((Map.Entry) (obj1)).getValue()).size(); - int size2 = ((List) ((Map.Entry) (obj2)).getValue()).size(); - return size2 - size1; - } - }); - - Map res = new LinkedHashMap(); - for (Iterator it = list.iterator(); it.hasNext(); ) { - Map.Entry entry = (Map.Entry) it.next(); - res.put(entry.getKey(), entry.getValue()); - } - return res; - } - - void initSet1() { - blockInfos = new ArrayList<>(); - activeNode = new ArrayList<>(); - activeNode.add("node-7"); - activeNode.add("node-9"); - activeNode.add("node-11"); - String[] location = { "node-7", "node-9", "node-11" }; - blockInfos.add(new TableBlockInfo("node", 1, "1", location, 0)); - blockInfos.add(new TableBlockInfo("node", 2, "1", location, 0)); - blockInfos.add(new TableBlockInfo("node", 3, "1", location, 0)); - blockInfos.add(new TableBlockInfo("node", 4, "1", location, 0)); - blockInfos.add(new TableBlockInfo("node", 5, "1", location, 0)); - blockInfos.add(new TableBlockInfo("node", 6, "1", location, 0)); - expected = new HashMap<>(); - expected.put("node-7", blockInfos.subList(0, 2)); - expected.put("node-9", blockInfos.subList(2, 4)); - expected.put("node-11", blockInfos.subList(4, 6)); - } - - void initSet2() { - blockInfos = new ArrayList<>(); - activeNode = new ArrayList<>(); - activeNode.add("node-7"); - activeNode.add("node-9"); - activeNode.add("node-11"); - String[] location = { "node-7", "node-11" }; - blockInfos.add(new TableBlockInfo("node", 1, "1", location, 0)); - blockInfos.add(new TableBlockInfo("node", 2, "1", location, 0)); - blockInfos.add(new TableBlockInfo("node", 3, "1", location, 0)); - blockInfos.add(new TableBlockInfo("node", 4, "1", location, 0)); - blockInfos.add(new TableBlockInfo("node", 5, "1", location, 0)); - blockInfos.add(new TableBlockInfo("node", 6, "1", location, 0)); - expected = new HashMap<>(); - expected.put("node-7", blockInfos.subList(0, 2)); - expected.put("node-9", blockInfos.subList(2, 4)); - expected.put("node-11", blockInfos.subList(4, 6)); - } - - void initSet3() { - blockInfos = new ArrayList<>(); - activeNode = new ArrayList<>(); - activeNode.add("node-7"); - activeNode.add("node-11"); - String[] location = { "node-7", "node-9", "node-11" }; - blockInfos.add(new TableBlockInfo("node", 1, "1", location, 0)); - blockInfos.add(new TableBlockInfo("node", 2, "1", location, 0)); - blockInfos.add(new TableBlockInfo("node", 3, "1", location, 0)); - blockInfos.add(new TableBlockInfo("node", 4, "1", location, 0)); - blockInfos.add(new TableBlockInfo("node", 5, "1", location, 0)); - blockInfos.add(new TableBlockInfo("node", 6, "1", location, 0)); - expected = new HashMap<>(); - expected.put("node-7", blockInfos.subList(0, 3)); - expected.put("node-11", blockInfos.subList(3, 6)); - } - - - /** - * Test case with 4 blocks and 4 nodes with 3 replication. - * - * @throws Exception - */ - @Test public void nodeBlockMapping() throws Exception { - - Map<TableBlockInfo, List<String>> inputMap = new HashMap<TableBlockInfo, List<String>>(5); - - TableBlockInfo block1 = - new TableBlockInfo("path1", 123, "1", new String[] { "1", "2", "3" }, 111); - TableBlockInfo block2 = - new TableBlockInfo("path2", 123, "2", new String[] { "2", "3", "4" }, 111); - TableBlockInfo block3 = - new TableBlockInfo("path3", 123, "3", new String[] { "3", "4", "1" }, 111); - TableBlockInfo block4 = - new TableBlockInfo("path4", 123, "4", new String[] { "1", "2", "4" }, 111); - - inputMap.put(block1, Arrays.asList(new String[]{"1","2","3"})); - inputMap.put(block2, Arrays.asList(new String[]{"2","3","4"})); - inputMap.put(block3, Arrays.asList(new String[]{"3","4","1"})); - inputMap.put(block4, Arrays.asList(new String[]{"1","2","4"})); - - List<TableBlockInfo> inputBlocks = new ArrayList(6); - inputBlocks.add(block1); - inputBlocks.add(block2); - inputBlocks.add(block3); - inputBlocks.add(block4); - - Map<String, List<TableBlockInfo>> outputMap - = CarbonLoaderUtil.nodeBlockMapping(inputBlocks, 4); - - Assert.assertTrue(calculateBlockDistribution(inputMap, outputMap, 4, 4)); - - Assert.assertTrue(calculateBlockLocality(inputMap, outputMap, 4, 4)); - } - - private boolean calculateBlockLocality(Map<TableBlockInfo, List<String>> inputMap, - Map<String, List<TableBlockInfo>> outputMap, int numberOfBlocks, int numberOfNodes) { - - double notInNodeLocality = 0; - for (Map.Entry<String, List<TableBlockInfo>> entry : outputMap.entrySet()) { - - List<TableBlockInfo> blockListOfANode = entry.getValue(); - - for (TableBlockInfo eachBlock : blockListOfANode) { - - // for each block check the node locality - - List<String> blockLocality = inputMap.get(eachBlock); - if (!blockLocality.contains(entry.getKey())) { - notInNodeLocality++; - } - } - } - - System.out.println( - ((notInNodeLocality / numberOfBlocks) * 100) + " " + "is the node locality mismatch"); - if ((notInNodeLocality / numberOfBlocks) * 100 > 30) { - return false; - } - return true; - } - - private boolean calculateBlockDistribution(Map<TableBlockInfo, List<String>> inputMap, - Map<String, List<TableBlockInfo>> outputMap, int numberOfBlocks, int numberOfNodes) { - - int nodesPerBlock = numberOfBlocks / numberOfNodes; - - for (Map.Entry<String, List<TableBlockInfo>> entry : outputMap.entrySet()) { - - if (entry.getValue().size() < nodesPerBlock) { - return false; - } - } - return true; - } - - /** - * Test case with 5 blocks and 3 nodes - * - * @throws Exception - */ - @Test public void nodeBlockMappingTestWith5blocks3nodes() throws Exception { - - Map<TableBlockInfo, List<String>> inputMap = new HashMap<TableBlockInfo, List<String>>(5); - - TableBlockInfo block1 = - new TableBlockInfo("part-0-0-1462341987000", 123, "1", new String[] { "1", "2", "3" }, 111); - TableBlockInfo block2 = - new TableBlockInfo("part-1-0-1462341987000", 123, "2", new String[] { "1", "2", "3" }, 111); - TableBlockInfo block3 = - new TableBlockInfo("part-2-0-1462341987000", 123, "3", new String[] { "1", "2", "3" }, 111); - TableBlockInfo block4 = - new TableBlockInfo("part-3-0-1462341987000", 123, "4", new String[] { "1", "2", "3" }, 111); - TableBlockInfo block5 = - new TableBlockInfo("part-4-0-1462341987000", 123, "5", new String[] { "1", "2", "3" }, 111); - - inputMap.put(block1, Arrays.asList(new String[]{"1","2","3"})); - inputMap.put(block2, Arrays.asList(new String[]{"1","2","3"})); - inputMap.put(block3, Arrays.asList(new String[]{"1","2","3"})); - inputMap.put(block4, Arrays.asList(new String[]{"1","2","3"})); - inputMap.put(block5, Arrays.asList(new String[]{"1","2","3"})); - - List<TableBlockInfo> inputBlocks = new ArrayList(6); - inputBlocks.add(block1); - inputBlocks.add(block2); - inputBlocks.add(block3); - inputBlocks.add(block4); - inputBlocks.add(block5); - - Map<String, List<TableBlockInfo>> outputMap = CarbonLoaderUtil.nodeBlockMapping(inputBlocks, 3); - - Assert.assertTrue(calculateBlockDistribution(inputMap, outputMap, 5, 3)); - - Assert.assertTrue(calculateBlockLocality(inputMap, outputMap, 5, 3)); - - } - - /** - * Test case with 6 blocks and 4 nodes where 4 th node doesnt have any local data. - * - * @throws Exception - */ - @Test public void nodeBlockMappingTestWith6Blocks4nodes() throws Exception { - - Map<TableBlockInfo, List<String>> inputMap = new HashMap<TableBlockInfo, List<String>>(5); - - TableBlockInfo block1 = - new TableBlockInfo("part-0-0-1462341987000", 123, "1", new String[] { "1", "2", "3" }, 111); - TableBlockInfo block2 = - new TableBlockInfo("part-1-0-1462341987000", 123, "2", new String[] { "1", "2", "3" }, 111); - TableBlockInfo block3 = - new TableBlockInfo("part-2-0-1462341987000", 123, "3", new String[] { "1", "2", "3" }, 111); - TableBlockInfo block4 = - new TableBlockInfo("part-3-0-1462341987000", 123, "4", new String[] { "1", "2", "3" }, 111); - TableBlockInfo block5 = - new TableBlockInfo("part-4-0-1462341987000", 123, "5", new String[] { "1", "2", "3" }, 111); - TableBlockInfo block6 = - new TableBlockInfo("part-5-0-1462341987000", 123, "6", new String[] { "1", "2", "3" }, 111); - - inputMap.put(block1, Arrays.asList(new String[]{"1","2","3"})); - inputMap.put(block2, Arrays.asList(new String[]{"1","2","3"})); - inputMap.put(block3, Arrays.asList(new String[]{"1","2","3"})); - inputMap.put(block4, Arrays.asList(new String[]{"1","2","3"})); - inputMap.put(block5, Arrays.asList(new String[]{"1","2","3"})); - inputMap.put(block6, Arrays.asList(new String[]{"1","2","3"})); - - - List<TableBlockInfo> inputBlocks = new ArrayList(6); - inputBlocks.add(block1); - inputBlocks.add(block2); - inputBlocks.add(block3); - inputBlocks.add(block4); - inputBlocks.add(block5); - inputBlocks.add(block6); - - Map<String, List<TableBlockInfo>> outputMap = CarbonLoaderUtil.nodeBlockMapping(inputBlocks, 4); - - Assert.assertTrue(calculateBlockDistribution(inputMap, outputMap, 6, 4)); - - Assert.assertTrue(calculateBlockLocality(inputMap, outputMap, 6, 4)); - - } - - /** - * Test case with 10 blocks and 4 nodes with 10,60,30 % distribution - * - * @throws Exception - */ - @Test public void nodeBlockMappingTestWith10Blocks4nodes() throws Exception { - - Map<TableBlockInfo, List<String>> inputMap = new HashMap<TableBlockInfo, List<String>>(5); - - TableBlockInfo block1 = - new TableBlockInfo("part-1-0-1462341987000", 123, "1", new String[] { "2", "4" }, 111); - TableBlockInfo block2 = - new TableBlockInfo("part-2-0-1462341987000", 123, "2", new String[] { "2", "4" }, 111); - TableBlockInfo block3 = - new TableBlockInfo("part-3-0-1462341987000", 123, "3", new String[] { "2", "4" }, 111); - TableBlockInfo block4 = - new TableBlockInfo("part-4-0-1462341987000", 123, "4", new String[] { "2", "4" }, 111); - TableBlockInfo block5 = - new TableBlockInfo("part-5-0-1462341987000", 123, "5", new String[] { "2", "4" }, 111); - TableBlockInfo block6 = - new TableBlockInfo("part-6-0-1462341987000", 123, "6", new String[] { "2", "4" }, 111); - TableBlockInfo block7 = - new TableBlockInfo("part-7-0-1462341987000", 123, "7", new String[] { "3", "4" }, 111); - TableBlockInfo block8 = - new TableBlockInfo("part-8-0-1462341987000", 123, "8", new String[] { "3", "4" }, 111); - TableBlockInfo block9 = - new TableBlockInfo("part-9-0-1462341987000", 123, "9", new String[] { "3", "4" }, 111); - TableBlockInfo block10 = - new TableBlockInfo("part-10-0-1462341987000", 123, "9", new String[] { "1", "4" }, 111); - - inputMap.put(block1, Arrays.asList(new String[]{"2","4"})); - inputMap.put(block2, Arrays.asList(new String[]{"2","4"})); - inputMap.put(block3, Arrays.asList(new String[]{"2","4"})); - inputMap.put(block4, Arrays.asList(new String[]{"2","4"})); - inputMap.put(block5, Arrays.asList(new String[]{"2","4"})); - inputMap.put(block6, Arrays.asList(new String[]{"2","4"})); - inputMap.put(block7, Arrays.asList(new String[]{"3","4"})); - inputMap.put(block8, Arrays.asList(new String[]{"3","4"})); - inputMap.put(block9, Arrays.asList(new String[]{"3","4"})); - inputMap.put(block10, Arrays.asList(new String[]{"1","4"})); - - List<TableBlockInfo> inputBlocks = new ArrayList(6); - inputBlocks.add(block1); - inputBlocks.add(block2); - inputBlocks.add(block3); - inputBlocks.add(block4); - inputBlocks.add(block5); - inputBlocks.add(block6); - inputBlocks.add(block7); - inputBlocks.add(block8); - inputBlocks.add(block9); - inputBlocks.add(block10); - - Map<String, List<TableBlockInfo>> outputMap = CarbonLoaderUtil.nodeBlockMapping(inputBlocks, 4); - - Assert.assertTrue(calculateBlockDistribution(inputMap, outputMap, 10, 4)); - - Assert.assertTrue(calculateBlockLocality(inputMap, outputMap, 10, 4)); - } - -} \ No newline at end of file
