Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2963#discussion_r244007049
--- Diff:
datamap/example/src/main/java/org/apache/carbondata/datamap/minmax/MinMaxDataMapFactory.java
---
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap.minmax;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import
org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.cache.Cache;
+import org.apache.carbondata.core.cache.CacheProvider;
+import org.apache.carbondata.core.cache.CacheType;
+import org.apache.carbondata.core.datamap.DataMapDistributable;
+import org.apache.carbondata.core.datamap.DataMapLevel;
+import org.apache.carbondata.core.datamap.DataMapMeta;
+import org.apache.carbondata.core.datamap.DataMapStoreManager;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datamap.TableDataMap;
+import org.apache.carbondata.core.datamap.dev.DataMapBuilder;
+import org.apache.carbondata.core.datamap.dev.DataMapWriter;
+import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
+import
org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMapFactory;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.features.TableOperation;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+import
org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.events.Event;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Min Max DataMap Factory
+ */
[email protected]
+public class MinMaxDataMapFactory extends CoarseGrainDataMapFactory {
+ private static final Logger LOGGER =
+
LogServiceFactory.getLogService(MinMaxDataMapFactory.class.getName());
+ private DataMapMeta dataMapMeta;
+ private String dataMapName;
+ // segmentId -> list of index files
+ private Map<String, Set<String>> segmentMap = new ConcurrentHashMap<>();
+ private Cache<MinMaxDataMapCacheKeyValue.Key,
MinMaxDataMapCacheKeyValue.Value> cache;
+
+ public MinMaxDataMapFactory(CarbonTable carbonTable, DataMapSchema
dataMapSchema)
+ throws MalformedDataMapCommandException {
+ super(carbonTable, dataMapSchema);
+
+ // this is an example for datamap, we can choose the columns and
operations that
+ // will be supported by this datamap. Furthermore, we can add
cache-support for this datamap.
+
+ this.dataMapName = dataMapSchema.getDataMapName();
+ List<CarbonColumn> indexedColumns =
carbonTable.getIndexedColumns(dataMapSchema);
+
+ // operations that will be supported on the indexed columns
+ List<ExpressionType> optOperations = new ArrayList<>();
+ optOperations.add(ExpressionType.NOT);
+ optOperations.add(ExpressionType.EQUALS);
+ optOperations.add(ExpressionType.NOT_EQUALS);
+ optOperations.add(ExpressionType.GREATERTHAN);
+ optOperations.add(ExpressionType.GREATERTHAN_EQUALTO);
+ optOperations.add(ExpressionType.LESSTHAN);
+ optOperations.add(ExpressionType.LESSTHAN_EQUALTO);
+ optOperations.add(ExpressionType.IN);
+ this.dataMapMeta = new DataMapMeta(indexedColumns, optOperations);
+
+ // init cache. note that the createCache ensures the singleton of the
cache
+ try {
+ this.cache = CacheProvider.getInstance()
+ .createCache(new CacheType("minmax_cache"),
MinMaxDataMapCache.class.getName());
+ } catch (Exception e) {
+ LOGGER.error("Failed to create cache for minmax datamap", e);
+ throw new MalformedDataMapCommandException(e.getMessage());
+ }
+ }
+
+ /**
+ * createWriter will return the MinMaxDataWriter.
+ *
+ * @param segment
+ * @param shardName
+ * @return
+ */
+ @Override
+ public DataMapWriter createWriter(Segment segment, String shardName,
+ SegmentProperties segmentProperties) throws IOException {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(String.format(
+ "Data of MinMaxDataMap %s for table %s will be written to %s",
+ dataMapName, getCarbonTable().getTableName(), shardName));
+ }
+ return new MinMaxDataMapDirectWriter(getCarbonTable().getTablePath(),
dataMapName,
+ dataMapMeta.getIndexedColumns(), segment, shardName,
segmentProperties);
+ }
+
+ @Override
+ public DataMapBuilder createBuilder(Segment segment, String shardName,
+ SegmentProperties segmentProperties) throws IOException {
+ return new MinMaxDataMapBuilder(getCarbonTable().getTablePath(),
dataMapName,
+ dataMapMeta.getIndexedColumns(), segment, shardName,
segmentProperties);
+ }
+
+ /**
+ * getDataMaps Factory method Initializes the Min Max Data Map and
returns.
+ *
+ * @param segment
+ * @return
+ * @throws IOException
+ */
+ @Override
+ public List<CoarseGrainDataMap> getDataMaps(Segment segment) throws
IOException {
+ List<CoarseGrainDataMap> dataMaps = new ArrayList<>();
+ Set<String> shardPaths = segmentMap.get(segment.getSegmentNo());
+ if (shardPaths == null) {
+ String dataMapStorePath = DataMapWriter.getDefaultDataMapPath(
+ getCarbonTable().getTablePath(), segment.getSegmentNo(),
dataMapName);
+ CarbonFile[] carbonFiles =
FileFactory.getCarbonFile(dataMapStorePath).listFiles();
+ shardPaths = new HashSet<>();
+ for (CarbonFile carbonFile : carbonFiles) {
+ shardPaths.add(carbonFile.getAbsolutePath());
+ }
+ segmentMap.put(segment.getSegmentNo(), shardPaths);
+ }
+
+ for (String shard : shardPaths) {
+ MinMaxIndexDataMap dataMap = new MinMaxIndexDataMap();
+ dataMap.init(new MinMaxDataMapModel(shard, cache,
segment.getConfiguration()));
+ dataMap.initOthers(getCarbonTable(),
dataMapMeta.getIndexedColumns());
+ dataMaps.add(dataMap);
+ }
+ return dataMaps;
+ }
+
+ @Override
+ public DataMapMeta getMeta() {
+ return this.dataMapMeta;
+ }
+
+ @Override
+ public DataMapLevel getDataMapLevel() {
+ return DataMapLevel.CG;
+ }
+
+ @Override
+ public List<CoarseGrainDataMap> getDataMaps(DataMapDistributable
distributable)
+ throws IOException {
+ List<CoarseGrainDataMap> coarseGrainDataMaps = new ArrayList<>();
+ MinMaxIndexDataMap minMaxIndexDataMap = new MinMaxIndexDataMap();
+ String indexPath = ((MinMaxDataMapDistributable)
distributable).getIndexPath();
+ minMaxIndexDataMap.init(
+ new MinMaxDataMapModel(indexPath, cache,
FileFactory.getConfiguration()));
+ minMaxIndexDataMap.initOthers(getCarbonTable(),
dataMapMeta.getIndexedColumns());
+ coarseGrainDataMaps.add(minMaxIndexDataMap);
+ return coarseGrainDataMaps;
+ }
+
+ /**
+ * returns all the directories of lucene index files for query
+ * Note: copied from BloomFilterDataMapFactory, will extract to a common
interface
+ */
+ private CarbonFile[] getAllIndexDirs(String tablePath, String segmentId)
{
+ List<CarbonFile> indexDirs = new ArrayList<>();
+ List<TableDataMap> dataMaps;
+ try {
+ // there can be multiple bloom datamaps present on a table, so get
all datamaps and form
+ // the path till the index file directories in all datamaps folders
present in each segment
+ dataMaps =
DataMapStoreManager.getInstance().getAllDataMap(getCarbonTable());
+ } catch (IOException ex) {
+ LOGGER.error(String.format(
+ "failed to get datamaps for tablePath %s, segmentId %s",
tablePath, segmentId), ex);
+ throw new RuntimeException(ex);
+ }
+ if (dataMaps.size() > 0) {
+ for (TableDataMap dataMap : dataMaps) {
+ if
(dataMap.getDataMapSchema().getDataMapName().equals(this.dataMapName)) {
+ List<CarbonFile> indexFiles;
+ String dmPath = CarbonTablePath.getDataMapStorePath(tablePath,
segmentId,
+ dataMap.getDataMapSchema().getDataMapName());
+ final CarbonFile dirPath = FileFactory.getCarbonFile(dmPath);
+ indexFiles = Arrays.asList(dirPath.listFiles(new
CarbonFileFilter() {
+ @Override
+ public boolean accept(CarbonFile file) {
+ return file.isDirectory();
+ }
+ }));
+ indexDirs.addAll(indexFiles);
+ }
+ }
+ }
+ return indexDirs.toArray(new CarbonFile[0]);
+ }
+
+ @Override
+ public List<DataMapDistributable> toDistributable(Segment segment) {
+ List<DataMapDistributable> dataMapDistributableList = new
ArrayList<>();
+ CarbonFile[] indexDirs =
+ getAllIndexDirs(getCarbonTable().getTablePath(),
segment.getSegmentNo());
+ if (segment.getFilteredIndexShardNames().size() == 0) {
+ for (CarbonFile indexDir : indexDirs) {
+ DataMapDistributable bloomDataMapDistributable =
+ new MinMaxDataMapDistributable(indexDir.getAbsolutePath());
+ dataMapDistributableList.add(bloomDataMapDistributable);
+ }
+ return dataMapDistributableList;
+ }
+ for (CarbonFile indexDir : indexDirs) {
+ // Filter out the tasks which are filtered through CG datamap.
+ if
(!segment.getFilteredIndexShardNames().contains(indexDir.getName())) {
+ continue;
+ }
+ DataMapDistributable bloomDataMapDistributable =
+ new MinMaxDataMapDistributable(indexDir.getAbsolutePath());
+ dataMapDistributableList.add(bloomDataMapDistributable);
+ }
+ return dataMapDistributableList;
+ }
+
+ @Override
+ public void fireEvent(Event event) {
+
+ }
+
+ @Override
+ public void clear(Segment segment) {
+ Set<String> shards = segmentMap.remove(segment.getSegmentNo());
+ if (null != shards) {
+ for (String shard : shards) {
+ cache.invalidate(new MinMaxDataMapCacheKeyValue.Key(shard));
+ }
+ }
+ }
+
+ @Override
+ public synchronized void clear() {
+ if (segmentMap.size() > 0) {
+ List<String> segments = new ArrayList<>(segmentMap.keySet());
+ for (String segmentId : segments) {
+ clear(new Segment(segmentId, null, null));
+ }
+ }
+ }
+
+ @Override
+ public void deleteDatamapData(Segment segment) throws IOException {
+ try {
+ String segmentId = segment.getSegmentNo();
+ String datamapPath = CarbonTablePath
+ .getDataMapStorePath(getCarbonTable().getTablePath(), segmentId,
dataMapName);
+ if (FileFactory.isFileExist(datamapPath)) {
+ CarbonFile file = FileFactory.getCarbonFile(datamapPath);
+ CarbonUtil.deleteFoldersAndFilesSilent(file);
+ }
+ } catch (InterruptedException ex) {
+ throw new IOException("Failed to delete datamap for segment_" +
segment.getSegmentNo());
+ }
+ }
+
+ @Override
+ public void deleteDatamapData() {
+ SegmentStatusManager ssm =
+ new
SegmentStatusManager(getCarbonTable().getAbsoluteTableIdentifier());
+ try {
+ List<Segment> validSegments =
ssm.getValidAndInvalidSegments().getValidSegments();
+ for (Segment segment : validSegments) {
+ deleteDatamapData(segment);
+ }
+ } catch (IOException e) {
+ LOGGER.error("drop datamap failed, failed to delete datamap
directory");
+ }
+ }
+
+ @Override
+ public boolean willBecomeStale(TableOperation operation) {
+ switch (operation) {
+ case ALTER_DROP:
+ case ALTER_CHANGE_DATATYPE:
+ case DELETE:
+ case UPDATE:
+ case PARTITION:
+ return true;
+ default:
+ return false;
+ }
+ }
+
+ @Override
+ public boolean isOperationBlocked(TableOperation operation, Object...
targets) {
+ switch (operation) {
+ case ALTER_DROP: {
--- End diff --
`{` seems not necessary, to avoid using break?
---