[CARBONDATA-1480]Min Max Index Example for DataMap Datamap Example. Implementation of Min Max Index through Datamap. And Using the Index while prunning.
This closes #1359 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/6882f737 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/6882f737 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/6882f737 Branch: refs/heads/carbonstore-rebase5 Commit: 6882f737ebafb8a6470be638e7f4bb273e6c7292 Parents: 8da481a Author: sounakr <soun...@gmail.com> Authored: Thu Sep 28 16:21:05 2017 +0530 Committer: Jacky Li <jacky.li...@qq.com> Committed: Fri Mar 2 15:52:34 2018 +0800 ---------------------------------------------------------------------- .../core/datamap/DataMapStoreManager.java | 16 +- .../carbondata/core/datamap/TableDataMap.java | 18 +- .../carbondata/core/datamap/dev/DataMap.java | 11 +- .../core/datamap/dev/DataMapWriter.java | 3 +- .../indexstore/SegmentPropertiesFetcher.java | 36 +++ .../blockletindex/BlockletDataMap.java | 5 +- .../blockletindex/BlockletDataMapFactory.java | 32 ++- datamap/examples/pom.xml | 111 ++++++++++ .../datamap/examples/BlockletMinMax.java | 41 ++++ .../datamap/examples/MinMaxDataMap.java | 143 ++++++++++++ .../datamap/examples/MinMaxDataMapFactory.java | 114 ++++++++++ .../datamap/examples/MinMaxDataWriter.java | 221 +++++++++++++++++++ .../examples/MinMaxIndexBlockDetails.java | 77 +++++++ .../MinMaxDataMapExample.scala | 77 +++++++ .../testsuite/datamap/DataMapWriterSuite.scala | 2 +- pom.xml | 2 + .../datamap/DataMapWriterListener.java | 4 +- .../store/writer/AbstractFactDataWriter.java | 7 +- .../writer/v3/CarbonFactDataWriterImplV3.java | 3 + 19 files changed, 894 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/6882f737/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java index d30483a..90e5fff 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java @@ -26,6 +26,7 @@ import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.datamap.dev.DataMapFactory; import org.apache.carbondata.core.indexstore.BlockletDetailsFetcher; +import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher; import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMap; import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; @@ -103,7 +104,7 @@ public final class DataMapStoreManager { tableDataMaps = new ArrayList<>(); } TableDataMap dataMap = getTableDataMap(dataMapName, tableDataMaps); - if (dataMap != null) { + if (dataMap != null && dataMap.getDataMapName().equalsIgnoreCase(dataMapName)) { throw new RuntimeException("Already datamap exists in that path with type " + dataMapName); } @@ -113,12 +114,15 @@ public final class DataMapStoreManager { DataMapFactory dataMapFactory = factoryClass.newInstance(); dataMapFactory.init(identifier, dataMapName); BlockletDetailsFetcher blockletDetailsFetcher; + SegmentPropertiesFetcher segmentPropertiesFetcher = null; if (dataMapFactory instanceof BlockletDetailsFetcher) { blockletDetailsFetcher = (BlockletDetailsFetcher) dataMapFactory; } else { blockletDetailsFetcher = getBlockletDetailsFetcher(identifier); } - dataMap = new TableDataMap(identifier, dataMapName, dataMapFactory, blockletDetailsFetcher); + segmentPropertiesFetcher = (SegmentPropertiesFetcher) blockletDetailsFetcher; + dataMap = new TableDataMap(identifier, dataMapName, dataMapFactory, blockletDetailsFetcher, + segmentPropertiesFetcher); } catch (Exception e) { LOGGER.error(e); throw new RuntimeException(e); @@ -128,11 +132,11 @@ public final class DataMapStoreManager { return dataMap; } - private TableDataMap getTableDataMap(String dataMapName, - List<TableDataMap> tableDataMaps) { + private TableDataMap getTableDataMap(String dataMapName, List<TableDataMap> tableDataMaps) { TableDataMap dataMap = null; - for (TableDataMap tableDataMap: tableDataMaps) { - if (tableDataMap.getDataMapName().equals(dataMapName)) { + for (TableDataMap tableDataMap : tableDataMaps) { + if (tableDataMap.getDataMapName().equals(dataMapName) || (!tableDataMap.getDataMapName() + .equals(""))) { dataMap = tableDataMap; break; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/6882f737/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java index 6555d6c..a841f37 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java @@ -23,10 +23,12 @@ import java.util.List; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datamap.dev.DataMap; import org.apache.carbondata.core.datamap.dev.DataMapFactory; +import org.apache.carbondata.core.datastore.block.SegmentProperties; import org.apache.carbondata.core.indexstore.Blocklet; import org.apache.carbondata.core.indexstore.BlockletDetailsFetcher; import org.apache.carbondata.core.indexstore.ExtendedBlocklet; import org.apache.carbondata.core.indexstore.PartitionSpec; +import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; import org.apache.carbondata.events.Event; @@ -47,15 +49,19 @@ public final class TableDataMap extends OperationEventListener { private BlockletDetailsFetcher blockletDetailsFetcher; + private SegmentPropertiesFetcher segmentPropertiesFetcher; + /** * It is called to initialize and load the required table datamap metadata. */ public TableDataMap(AbsoluteTableIdentifier identifier, String dataMapName, - DataMapFactory dataMapFactory, BlockletDetailsFetcher blockletDetailsFetcher) { + DataMapFactory dataMapFactory, BlockletDetailsFetcher blockletDetailsFetcher, + SegmentPropertiesFetcher segmentPropertiesFetcher) { this.identifier = identifier; this.dataMapName = dataMapName; this.dataMapFactory = dataMapFactory; this.blockletDetailsFetcher = blockletDetailsFetcher; + this.segmentPropertiesFetcher = segmentPropertiesFetcher; } /** @@ -68,11 +74,13 @@ public final class TableDataMap extends OperationEventListener { public List<ExtendedBlocklet> prune(List<Segment> segments, FilterResolverIntf filterExp, List<PartitionSpec> partitions) throws IOException { List<ExtendedBlocklet> blocklets = new ArrayList<>(); + SegmentProperties segmentProperties; for (Segment segment : segments) { List<Blocklet> pruneBlocklets = new ArrayList<>(); List<DataMap> dataMaps = dataMapFactory.getDataMaps(segment); + segmentProperties = segmentPropertiesFetcher.getSegmentProperties(segment.getSegmentNo()); for (DataMap dataMap : dataMaps) { - pruneBlocklets.addAll(dataMap.prune(filterExp, partitions)); + pruneBlocklets.addAll(dataMap.prune(filterExp, segmentProperties, partitions)); } blocklets.addAll(addSegmentId(blockletDetailsFetcher .getExtendedBlocklets(pruneBlocklets, segment), segment.getSegmentNo())); @@ -124,7 +132,11 @@ public final class TableDataMap extends OperationEventListener { List<Blocklet> blocklets = new ArrayList<>(); List<DataMap> dataMaps = dataMapFactory.getDataMaps(distributable); for (DataMap dataMap : dataMaps) { - blocklets.addAll(dataMap.prune(filterExp, partitions)); + blocklets.addAll( + dataMap.prune( + filterExp, + segmentPropertiesFetcher.getSegmentProperties(distributable.getSegmentId()), + partitions)); } for (Blocklet blocklet: blocklets) { ExtendedBlocklet detailedBlocklet = http://git-wip-us.apache.org/repos/asf/carbondata/blob/6882f737/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java index f3642d6..434b371 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java @@ -19,6 +19,7 @@ package org.apache.carbondata.core.datamap.dev; import java.io.IOException; import java.util.List; +import org.apache.carbondata.core.datastore.block.SegmentProperties; import org.apache.carbondata.core.indexstore.Blocklet; import org.apache.carbondata.core.indexstore.PartitionSpec; import org.apache.carbondata.core.memory.MemoryException; @@ -35,16 +36,6 @@ public interface DataMap { void init(DataMapModel dataMapModel) throws MemoryException, IOException; /** - * Prune the datamap with filter expression. It returns the list of - * blocklets where these filters can exist. - * - * @param filterExp - * @return - */ - List<Blocklet> prune(FilterResolverIntf filterExp); - - // TODO Move this method to Abstract class - /** * Prune the datamap with filter expression and partition information. It returns the list of * blocklets where these filters can exist. * http://git-wip-us.apache.org/repos/asf/carbondata/blob/6882f737/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java index 28163d7..413eaa5 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapWriter.java @@ -27,7 +27,7 @@ public interface DataMapWriter { * Start of new block notification. * @param blockId file name of the carbondata file */ - void onBlockStart(String blockId); + void onBlockStart(String blockId, String blockPath); /** * End of block notification @@ -45,7 +45,6 @@ public interface DataMapWriter { * @param blockletId sequence number of blocklet in the block */ void onBlockletEnd(int blockletId); - /** * Add the column pages row to the datamap, order of pages is same as `indexColumns` in * DataMapMeta returned in DataMapFactory. http://git-wip-us.apache.org/repos/asf/carbondata/blob/6882f737/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentPropertiesFetcher.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentPropertiesFetcher.java b/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentPropertiesFetcher.java new file mode 100644 index 0000000..ec2ae93 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentPropertiesFetcher.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.core.indexstore; + +import java.io.IOException; + +import org.apache.carbondata.core.datastore.block.SegmentProperties; + +/** + * Fetches the detailed segmentProperties which has more information to execute the query + */ +public interface SegmentPropertiesFetcher { + + /** + * get the Segment properties based on the SegmentID. + * @param segmentId + * @return + * @throws IOException + */ + SegmentProperties getSegmentProperties(String segmentId) throws IOException; +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/6882f737/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java index 9ec7a46..4bd6ae7 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java @@ -624,8 +624,7 @@ public class BlockletDataMap implements DataMap, Cacheable { return false; } - @Override - public List<Blocklet> prune(FilterResolverIntf filterExp) { + private List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties) { if (unsafeMemoryDMStore.getRowCount() == 0) { return new ArrayList<>(); } @@ -685,7 +684,7 @@ public class BlockletDataMap implements DataMap, Cacheable { } } // Prune with filters if the partitions are existed in this datamap - return prune(filterExp); + return prune(filterExp, segmentProperties); } private boolean isCorrectUUID(String[] fileDetails, PartitionSpec spec) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/6882f737/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java index 5eb077f..89e61d2 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java @@ -31,14 +31,19 @@ import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datamap.dev.DataMap; import org.apache.carbondata.core.datamap.dev.DataMapFactory; import org.apache.carbondata.core.datamap.dev.DataMapWriter; +import org.apache.carbondata.core.datastore.block.SegmentProperties; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.indexstore.Blocklet; import org.apache.carbondata.core.indexstore.BlockletDetailsFetcher; import org.apache.carbondata.core.indexstore.ExtendedBlocklet; +import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher; import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.SegmentFileStore; +import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.util.DataFileFooterConverter; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.events.Event; @@ -50,13 +55,17 @@ import org.apache.hadoop.fs.RemoteIterator; /** * Table map for blocklet */ -public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFetcher { +public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFetcher, + SegmentPropertiesFetcher { private AbsoluteTableIdentifier identifier; // segmentId -> list of index file private Map<String, List<TableBlockIndexUniqueIdentifier>> segmentMap = new HashMap<>(); + // segmentId -> SegmentProperties. + private Map<String, SegmentProperties> segmentPropertiesMap = new HashMap<>(); + private Cache<TableBlockIndexUniqueIdentifier, DataMap> cache; @Override @@ -251,4 +260,25 @@ public class BlockletDataMapFactory implements DataMapFactory, BlockletDetailsFe // TODO: pass SORT_COLUMNS into this class return null; } + + @Override public SegmentProperties getSegmentProperties(String segmentId) throws IOException { + SegmentProperties segmentProperties = segmentPropertiesMap.get(segmentId); + if (segmentProperties == null) { + int[] columnCardinality; + List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers = + getTableBlockIndexUniqueIdentifiers(segmentId); + DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter(); + List<DataFileFooter> indexInfo = + fileFooterConverter.getIndexInfo(tableBlockIndexUniqueIdentifiers.get(0).getFilePath()); + for (DataFileFooter fileFooter : indexInfo) { + List<ColumnSchema> columnInTable = fileFooter.getColumnInTable(); + if (segmentProperties == null) { + columnCardinality = fileFooter.getSegmentInfo().getColumnCardinality(); + segmentProperties = new SegmentProperties(columnInTable, columnCardinality); + } + } + segmentPropertiesMap.put(segmentId, segmentProperties); + } + return segmentProperties; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/6882f737/datamap/examples/pom.xml ---------------------------------------------------------------------- diff --git a/datamap/examples/pom.xml b/datamap/examples/pom.xml new file mode 100644 index 0000000..6832e62 --- /dev/null +++ b/datamap/examples/pom.xml @@ -0,0 +1,111 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.carbondata</groupId> + <artifactId>carbondata-parent</artifactId> + <version>1.3.0-SNAPSHOT</version> + <relativePath>../../pom.xml</relativePath> + </parent> + + <artifactId>carbondata-datamap-examples</artifactId> + <name>Apache CarbonData :: Datamap Examples</name> + + <properties> + <dev.path>${basedir}/../../dev</dev.path> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.carbondata</groupId> + <artifactId>carbondata-spark2</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.spark</groupId> + <artifactId>spark-hive-thriftserver_2.10</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.spark</groupId> + <artifactId>spark-repl_2.10</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.spark</groupId> + <artifactId>spark-sql_2.10</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-sql_${scala.binary.version}</artifactId> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-repl_${scala.binary.version}</artifactId> + </dependency> + </dependencies> + + <build> + <sourceDirectory>src/minmaxdatamap/main/java</sourceDirectory> + <resources> + <resource> + <directory>.</directory> + <includes> + <include>CARBON_EXAMPLESLogResource.properties</include> + </includes> + </resource> + </resources> + <plugins> + <plugin> + <groupId>org.scala-tools</groupId> + <artifactId>maven-scala-plugin</artifactId> + <version>2.15.2</version> + <executions> + <execution> + <id>compile</id> + <goals> + <goal>compile</goal> + </goals> + <phase>compile</phase> + </execution> + <execution> + <phase>process-resources</phase> + <goals> + <goal>compile</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>1.7</source> + <target>1.7</target> + </configuration> + </plugin> + </plugins> + </build> + +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/6882f737/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/BlockletMinMax.java ---------------------------------------------------------------------- diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/BlockletMinMax.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/BlockletMinMax.java new file mode 100644 index 0000000..e6968fe --- /dev/null +++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/BlockletMinMax.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.datamap.examples; + + +public class BlockletMinMax { + private byte[][] Min; + + private byte[][] Max; + + public byte[][] getMin() { + return Min; + } + + public void setMin(byte[][] min) { + Min = min; + } + + public byte[][] getMax() { + return Max; + } + + public void setMax(byte[][] max) { + Max = max; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/6882f737/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMap.java ---------------------------------------------------------------------- diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMap.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMap.java new file mode 100644 index 0000000..2ad6327 --- /dev/null +++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMap.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.datamap.examples; + +import java.io.BufferedReader; +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.List; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datamap.dev.DataMap; +import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; +import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.fileoperations.AtomicFileOperations; +import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl; +import org.apache.carbondata.core.indexstore.Blocklet; +import org.apache.carbondata.core.memory.MemoryException; +import org.apache.carbondata.core.scan.filter.FilterUtil; +import org.apache.carbondata.core.scan.filter.executer.FilterExecuter; +import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.carbondata.core.util.CarbonUtil; + +import com.google.gson.Gson; + +/** + * Datamap implementation for min max blocklet. + */ +public class MinMaxDataMap implements DataMap { + + private static final LogService LOGGER = + LogServiceFactory.getLogService(MinMaxDataMap.class.getName()); + + public static final String NAME = "clustered.minmax.btree.blocklet"; + + private String filePath; + + private MinMaxIndexBlockDetails[] readMinMaxDataMap; + + @Override public void init(String filePath) throws MemoryException, IOException { + this.filePath = filePath; + CarbonFile[] listFiles = getCarbonMinMaxIndexFiles(filePath, "0"); + for (int i = 0; i < listFiles.length; i++) { + readMinMaxDataMap = readJson(listFiles[i].getPath()); + } + } + + private CarbonFile[] getCarbonMinMaxIndexFiles(String filePath, String segmentId) { + String path = filePath.substring(0, filePath.lastIndexOf("/") + 1); + CarbonFile carbonFile = FileFactory.getCarbonFile(path); + return carbonFile.listFiles(new CarbonFileFilter() { + @Override public boolean accept(CarbonFile file) { + return file.getName().endsWith(".minmaxindex"); + } + }); + } + + public MinMaxIndexBlockDetails[] readJson(String filePath) throws IOException { + Gson gsonObjectToRead = new Gson(); + DataInputStream dataInputStream = null; + BufferedReader buffReader = null; + InputStreamReader inStream = null; + MinMaxIndexBlockDetails[] readMinMax = null; + AtomicFileOperations fileOperation = + new AtomicFileOperationsImpl(filePath, FileFactory.getFileType(filePath)); + + try { + if (!FileFactory.isFileExist(filePath, FileFactory.getFileType(filePath))) { + return null; + } + dataInputStream = fileOperation.openForRead(); + inStream = new InputStreamReader(dataInputStream, + CarbonCommonConstants.CARBON_DEFAULT_STREAM_ENCODEFORMAT); + buffReader = new BufferedReader(inStream); + readMinMax = gsonObjectToRead.fromJson(buffReader, MinMaxIndexBlockDetails[].class); + } catch (IOException e) { + return null; + } finally { + CarbonUtil.closeStreams(buffReader, inStream, dataInputStream); + } + return readMinMax; + } + + /** + * Block Prunning logic for Min Max DataMap. + * + * @param filterExp + * @param segmentProperties + * @return + */ + @Override public List<Blocklet> prune(FilterResolverIntf filterExp, + SegmentProperties segmentProperties) { + List<Blocklet> blocklets = new ArrayList<>(); + + if (filterExp == null) { + for (int i = 0; i < readMinMaxDataMap.length; i++) { + blocklets.add(new Blocklet(readMinMaxDataMap[i].getFilePath(), + String.valueOf(readMinMaxDataMap[i].getBlockletId()))); + } + } else { + FilterExecuter filterExecuter = + FilterUtil.getFilterExecuterTree(filterExp, segmentProperties, null); + int startIndex = 0; + while (startIndex < readMinMaxDataMap.length) { + BitSet bitSet = filterExecuter.isScanRequired(readMinMaxDataMap[startIndex].getMaxValues(), + readMinMaxDataMap[startIndex].getMinValues()); + if (!bitSet.isEmpty()) { + blocklets.add(new Blocklet(readMinMaxDataMap[startIndex].getFilePath(), + String.valueOf(readMinMaxDataMap[startIndex].getBlockletId()))); + } + startIndex++; + } + } + return blocklets; + } + + @Override + public void clear() { + readMinMaxDataMap = null; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/6882f737/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java ---------------------------------------------------------------------- diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java new file mode 100644 index 0000000..b196d0d --- /dev/null +++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.datamap.examples; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.carbondata.core.datamap.DataMapDistributable; +import org.apache.carbondata.core.datamap.DataMapMeta; +import org.apache.carbondata.core.datamap.dev.DataMap; +import org.apache.carbondata.core.datamap.dev.DataMapFactory; +import org.apache.carbondata.core.datamap.dev.DataMapWriter; +import org.apache.carbondata.core.events.ChangeEvent; +import org.apache.carbondata.core.indexstore.schema.FilterType; +import org.apache.carbondata.core.memory.MemoryException; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; + + +/** + * Min Max DataMap Factory + */ +public class MinMaxDataMapFactory implements DataMapFactory { + + private AbsoluteTableIdentifier identifier; + + @Override + public void init(AbsoluteTableIdentifier identifier, String dataMapName) { + this.identifier = identifier; + } + + /** + * createWriter will return the MinMaxDataWriter. + * @param segmentId + * @return + */ + @Override + public DataMapWriter createWriter(String segmentId) { + return new MinMaxDataWriter(); + } + + /** + * getDataMaps Factory method Initializes the Min Max Data Map and returns. + * @param segmentId + * @return + * @throws IOException + */ + @Override public List<DataMap> getDataMaps(String segmentId) throws IOException { + List<DataMap> dataMapList = new ArrayList<>(); + // Form a dataMap of Type MinMaxDataMap. + MinMaxDataMap dataMap = new MinMaxDataMap(); + try { + dataMap.init(identifier.getTablePath() + "/Fact/Part0/Segment_" + segmentId + File.separator); + } catch (MemoryException ex) { + + } + dataMapList.add(dataMap); + return dataMapList; + } + + /** + * + * @param segmentId + * @return + */ + @Override public List<DataMapDistributable> toDistributable(String segmentId) { + return null; + } + + /** + * Clear the DataMap. + * @param segmentId + */ + @Override public void clear(String segmentId) { + } + + /** + * Clearing the data map. + */ + @Override + public void clear() { + } + + @Override public DataMap getDataMap(DataMapDistributable distributable) { + return null; + } + + @Override + public void fireEvent(ChangeEvent event) { + + } + + @Override + public DataMapMeta getMeta() { + return new DataMapMeta(new ArrayList<String>(Arrays.asList("c2")), FilterType.EQUALTO); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/6882f737/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java ---------------------------------------------------------------------- diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java new file mode 100644 index 0000000..78544d3 --- /dev/null +++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.datamap.examples; + +import java.io.BufferedWriter; +import java.io.DataOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datamap.dev.DataMapWriter; +import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.datastore.page.ColumnPage; +import org.apache.carbondata.core.metadata.schema.table.TableInfo; +import org.apache.carbondata.core.util.ByteUtil; +import org.apache.carbondata.core.util.CarbonUtil; + +import com.google.gson.Gson; + +public class MinMaxDataWriter implements DataMapWriter { + + private static final LogService LOGGER = + LogServiceFactory.getLogService(TableInfo.class.getName()); + + private byte[][] pageLevelMin, pageLevelMax; + + private byte[][] blockletLevelMin, blockletLevelMax; + + private Map<Integer, BlockletMinMax> blockMinMaxMap; + + private String blockPath; + + + @Override public void onBlockStart(String blockId, String blockPath) { + pageLevelMax = null; + pageLevelMin = null; + blockletLevelMax = null; + blockletLevelMin = null; + blockMinMaxMap = null; + blockMinMaxMap = new HashMap<Integer, BlockletMinMax>(); + this.blockPath = blockPath; + } + + @Override public void onBlockEnd(String blockId) { + updateMinMaxIndex(blockId); + } + + @Override public void onBlockletStart(int blockletId) { + } + + @Override public void onBlockletEnd(int blockletId) { + updateBlockletMinMax(blockletId); + } + + @Override + public void onPageAdded(int blockletId, int pageId, ColumnPage[] pages) { + // Calculate Min and Max value within this page. + + // As part of example we are extracting Min Max values Manually. The same can be done from + // retrieving the page statistics. For e.g. + + // if (pageLevelMin == null && pageLevelMax == null) { + // pageLevelMin[1] = CarbonUtil.getValueAsBytes(pages[0].getStatistics().getDataType(), + // pages[0].getStatistics().getMin()); + // pageLevelMax[1] = CarbonUtil.getValueAsBytes(pages[0].getStatistics().getDataType(), + // pages[0].getStatistics().getMax()); + // } else { + // if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(pageLevelMin[1], CarbonUtil + // .getValueAsBytes(pages[0].getStatistics().getDataType(), + // pages[0].getStatistics().getMin())) > 0) { + // pageLevelMin[1] = CarbonUtil.getValueAsBytes(pages[0].getStatistics().getDataType(), + // pages[0].getStatistics().getMin()); + // } + // if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(pageLevelMax[1], CarbonUtil + // .getValueAsBytes(pages[0].getStatistics().getDataType(), + // pages[0].getStatistics().getMax())) < 0) { + // pageLevelMax[1] = CarbonUtil.getValueAsBytes(pages[0].getStatistics().getDataType(), + // pages[0].getStatistics().getMax()); + // } + + byte[] value = new byte[pages[0].getBytes(0).length - 2]; + if (pageLevelMin == null && pageLevelMax == null) { + pageLevelMin = new byte[2][]; + pageLevelMax = new byte[2][]; + + System.arraycopy(pages[0].getBytes(0), 2, value, 0, value.length); + pageLevelMin[1] = value; + pageLevelMax[1] = value; + + } else { + for (int rowIndex = 0; rowIndex < pages[0].getPageSize(); rowIndex++) { + System.arraycopy(pages[0].getBytes(rowIndex), 2, value, 0, value.length); + if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(pageLevelMin[1], value) > 0) { + pageLevelMin[1] = value; + } + if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(pageLevelMax[1], value) < 0) { + pageLevelMax[1] = value; + } + } + } + } + + private void updateBlockletMinMax(int blockletId) { + if (blockletLevelMax == null || blockletLevelMin == null) { + blockletLevelMax = new byte[2][]; + blockletLevelMin = new byte[2][]; + if (pageLevelMax != null || pageLevelMin != null) { + blockletLevelMin = pageLevelMin; + blockletLevelMax = pageLevelMax; + } + } else { + if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(blockletLevelMin[1], pageLevelMin[1]) > 0) { + blockletLevelMin = pageLevelMin; + } + + if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(blockletLevelMax[1], pageLevelMax[1]) > 0) { + blockletLevelMax = pageLevelMax; + } + } + BlockletMinMax blockletMinMax = new BlockletMinMax(); + blockletMinMax.setMax(blockletLevelMax); + blockletMinMax.setMin(blockletLevelMin); + blockMinMaxMap.put(blockletId, blockletMinMax); + } + + + public void updateMinMaxIndex(String blockId) { + constructMinMaxIndex(blockId); + } + + + + /** + * Construct the Min Max Index. + * @param blockId + */ + public void constructMinMaxIndex(String blockId) { + // construct Min and Max values of each Blocklets present inside a block. + List<MinMaxIndexBlockDetails> tempMinMaxIndexBlockDetails = null; + tempMinMaxIndexBlockDetails = loadBlockDetails(); + try { + writeMinMaxIndexFile(tempMinMaxIndexBlockDetails, blockPath, blockId); + } catch (IOException ex) { + LOGGER.info(" Unable to write the file"); + } + } + + /** + * loadBlockDetails into the MinMaxIndexBlockDetails class. + */ + private List<MinMaxIndexBlockDetails> loadBlockDetails() { + List<MinMaxIndexBlockDetails> minMaxIndexBlockDetails = new ArrayList<MinMaxIndexBlockDetails>(); + MinMaxIndexBlockDetails tmpminMaxIndexBlockDetails = new MinMaxIndexBlockDetails(); + + for (int index = 0; index < blockMinMaxMap.size(); index++) { + tmpminMaxIndexBlockDetails.setMinValues(blockMinMaxMap.get(index).getMin()); + tmpminMaxIndexBlockDetails.setMaxValues(blockMinMaxMap.get(index).getMax()); + tmpminMaxIndexBlockDetails.setBlockletId(index); + tmpminMaxIndexBlockDetails.setFilePath(this.blockPath); + minMaxIndexBlockDetails.add(tmpminMaxIndexBlockDetails); + } + return minMaxIndexBlockDetails; + } + + /** + * Write the data to a file. This is JSON format file. + * @param minMaxIndexBlockDetails + * @param blockPath + * @param blockId + * @throws IOException + */ + public void writeMinMaxIndexFile(List<MinMaxIndexBlockDetails> minMaxIndexBlockDetails, + String blockPath, String blockId) throws IOException { + String filePath = blockPath.substring(0, blockPath.lastIndexOf(File.separator) + 1) + blockId + + ".minmaxindex"; + BufferedWriter brWriter = null; + DataOutputStream dataOutStream = null; + try { + FileFactory.createNewFile(filePath, FileFactory.getFileType(filePath)); + dataOutStream = FileFactory.getDataOutputStream(filePath, FileFactory.getFileType(filePath)); + Gson gsonObjectToWrite = new Gson(); + brWriter = new BufferedWriter(new OutputStreamWriter(dataOutStream, + CarbonCommonConstants.CARBON_DEFAULT_STREAM_ENCODEFORMAT)); + String minmaxIndexData = gsonObjectToWrite.toJson(minMaxIndexBlockDetails); + brWriter.write(minmaxIndexData); + } catch (IOException ioe) { + LOGGER.info("Error in writing minMaxindex file"); + } finally { + if (null != brWriter) { + brWriter.flush(); + } + if (null != dataOutStream) { + dataOutStream.flush(); + } + CarbonUtil.closeStreams(brWriter, dataOutStream); + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/6882f737/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexBlockDetails.java ---------------------------------------------------------------------- diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexBlockDetails.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexBlockDetails.java new file mode 100644 index 0000000..0596db5 --- /dev/null +++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexBlockDetails.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.datamap.examples; + +import java.io.Serializable; + +public class MinMaxIndexBlockDetails implements Serializable { + private static final long serialVersionUID = 1206104914911491724L; + + /** + * Min value of a column of one blocklet Bit-Packed + */ + private byte[][] minValues; + + /** + * Max value of a columns of one blocklet Bit-Packed + */ + private byte[][] maxValues; + + /** + * filePath pointing to the block. + */ + private String filePath; + + /** + * BlockletID of the block. + */ + private Integer BlockletId; + + + public byte[][] getMinValues() { + return minValues; + } + + public void setMinValues(byte[][] minValues) { + this.minValues = minValues; + } + + public byte[][] getMaxValues() { + return maxValues; + } + + public void setMaxValues(byte[][] maxValues) { + this.maxValues = maxValues; + } + + public String getFilePath() { + return filePath; + } + + public void setFilePath(String filePath) { + this.filePath = filePath; + } + + public Integer getBlockletId() { + return BlockletId; + } + + public void setBlockletId(Integer blockletId) { + BlockletId = blockletId; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/6882f737/datamap/examples/src/minmaxdatamap/test/scala/minmaxdatamaptestcase/MinMaxDataMapExample.scala ---------------------------------------------------------------------- diff --git a/datamap/examples/src/minmaxdatamap/test/scala/minmaxdatamaptestcase/MinMaxDataMapExample.scala b/datamap/examples/src/minmaxdatamap/test/scala/minmaxdatamaptestcase/MinMaxDataMapExample.scala new file mode 100644 index 0000000..0cfe410 --- /dev/null +++ b/datamap/examples/src/minmaxdatamap/test/scala/minmaxdatamaptestcase/MinMaxDataMapExample.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.datamap.examples + +import java.io.File + +import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datamap.DataMapStoreManager +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier +import org.apache.carbondata.core.util.CarbonProperties + +object MinMaxDataMapExample { + def main(args: Array[String]): Unit = { + + val rootPath = new File(this.getClass.getResource("/").getPath + + "").getCanonicalPath + val storeLocation = s"$rootPath/dataMap/examples/target/store" + val warehouse = s"$rootPath/datamap/examples/target/warehouse" + val metastoredb = s"$rootPath/datamap/examples/target" + + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd") + + import org.apache.spark.sql.CarbonSession._ + + val spark = SparkSession + .builder() + .master("local") + .appName("CarbonDataMapExample") + .config("spark.sql.warehouse.dir", warehouse) + .getOrCreateCarbonSession(storeLocation) + + spark.sparkContext.setLogLevel("ERROR") + import spark.implicits._ + + // register datamap writer + DataMapStoreManager.getInstance().createAndRegisterDataMap( + AbsoluteTableIdentifier.from(storeLocation, "default", "carbonminmax"), + classOf[MinMaxDataMapFactory].getName, + MinMaxDataMap.NAME) + + spark.sql("DROP TABLE IF EXISTS carbonminmax") + + val df = spark.sparkContext.parallelize(1 to 33000) + .map(x => ("a", "b", x)) + .toDF("c1", "c2", "c3") + + // save dataframe to carbon file + df.write + .format("carbondata") + .option("tableName", "carbonminmax") + .mode(SaveMode.Overwrite) + .save() + + // Query the table. + spark.sql("select c2 from carbonminmax").show(20, false) + spark.sql("select c2 from carbonminmax where c2 = 'b'").show(20, false) + spark.sql("DROP TABLE IF EXISTS carbonminmax") + + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/6882f737/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala index c3f5d0a..c80ee2b 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala @@ -191,7 +191,7 @@ object DataMapWriterSuite { callbackSeq :+= s"blocklet start $blockletId" } - override def onBlockStart(blockId: String): Unit = { + override def onBlockStart(blockId: String, blockPath: String): Unit = { callbackSeq :+= s"block start $blockId" } http://git-wip-us.apache.org/repos/asf/carbondata/blob/6882f737/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 423f2dc..0b9917d 100644 --- a/pom.xml +++ b/pom.xml @@ -437,6 +437,7 @@ <module>format</module> <module>integration/spark2</module> <module>examples/spark2</module> + <module>datamap/examples</module> <module>integration/hive</module> <module>integration/presto</module> <module>examples/flink</module> @@ -473,6 +474,7 @@ <module>integration/presto</module> <module>streaming</module> <module>examples/spark2</module> + <module>datamap/examples</module> </modules> <build> <plugins> http://git-wip-us.apache.org/repos/asf/carbondata/blob/6882f737/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java index d739f8c..6fbbd3e 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java +++ b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java @@ -83,10 +83,10 @@ public class DataMapWriterListener { LOG.info("DataMapWriter " + writer + " added"); } - public void onBlockStart(String blockId) { + public void onBlockStart(String blockId, String blockPath) { for (List<DataMapWriter> writers : registry.values()) { for (DataMapWriter writer : writers) { - writer.onBlockStart(blockId); + writer.onBlockStart(blockId, blockPath); } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/6882f737/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java index c0b8065..02391cf 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java @@ -269,7 +269,7 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter { private void notifyDataMapBlockStart() { if (listener != null) { - listener.onBlockStart(carbonDataFileName); + listener.onBlockStart(carbonDataFileName, constructFactFileFullPath()); } } @@ -280,6 +280,11 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter { blockletId = 0; } + private String constructFactFileFullPath() { + String factFilePath = + this.dataWriterVo.getCarbonDataDirectoryPath() + File.separator + this.carbonDataFileName; + return factFilePath; + } /** * Finish writing current file. It will flush stream, copy and rename temp file to final file * @param copyInCurrentThread set to false if want to do data copy in a new thread http://git-wip-us.apache.org/repos/asf/carbondata/blob/6882f737/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java index ddf444d..80d8154 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java @@ -43,6 +43,7 @@ import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel; import org.apache.carbondata.processing.store.TablePage; import org.apache.carbondata.processing.store.writer.AbstractFactDataWriter; + /** * Below class will be used to write the data in V3 format * <Column1 Data ChunkV3><Column1<Page1><Page2><Page3><Page4>> @@ -157,6 +158,8 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter { } } + + /** * Write the collect blocklet data (blockletDataHolder) to file */