Repository: carbondata
Updated Branches:
  refs/heads/master 4c9bed8bc -> ecd6c0c54


[CARBONDATA-2350][DataMap] Fix bugs in minmax datamap example

Fix bugs in minmax datamap example

This closes #2174


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/ecd6c0c5
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/ecd6c0c5
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/ecd6c0c5

Branch: refs/heads/master
Commit: ecd6c0c54405c91434d1cbca2894b635318f7e4a
Parents: 4c9bed8
Author: xuchuanyin <xuchuan...@hust.edu.cn>
Authored: Mon Apr 16 10:55:10 2018 +0800
Committer: Jacky Li <jacky.li...@qq.com>
Committed: Tue Apr 17 01:05:32 2018 +0800

----------------------------------------------------------------------
 .../core/datastore/page/ColumnPage.java         |  57 ++++++
 .../datamap/examples/MinMaxDataWriter.java      | 195 ++++++++++++++-----
 .../datamap/examples/MinMaxIndexDataMap.java    |  77 +++++---
 .../examples/MinMaxIndexDataMapFactory.java     |  60 +++++-
 .../MinMaxDataMapExample.scala                  |  77 --------
 .../datamap/examples/MinMaxDataMapSuite.scala   | 127 ++++++++++++
 .../datamap/lucene/LuceneDataMapWriter.java     |   2 +-
 7 files changed, 433 insertions(+), 162 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/ecd6c0c5/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
index ebca3b7..68269fb 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
@@ -370,6 +370,38 @@ public abstract class ColumnPage {
   }
 
   /**
+   * get value at rowId, note that the value of string&bytes is LV format
+   * @param rowId rowId
+   * @return value
+   */
+  public Object getData(int rowId) {
+    if (nullBitSet.get(rowId)) {
+      return getNull(rowId);
+    }
+    if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.BYTE) {
+      byte value = getByte(rowId);
+      if (columnSpec.getSchemaDataType() == DataTypes.BOOLEAN) {
+        return BooleanConvert.byte2Boolean(value);
+      }
+      return value;
+    } else if (dataType == DataTypes.SHORT) {
+      return getShort(rowId);
+    } else if (dataType == DataTypes.INT) {
+      return getInt(rowId);
+    } else if (dataType == DataTypes.LONG) {
+      return getLong(rowId);
+    } else if (dataType == DataTypes.DOUBLE) {
+      return getDouble(rowId);
+    } else if (DataTypes.isDecimal(dataType)) {
+      return getDecimal(rowId);
+    } else if (dataType == DataTypes.STRING || dataType == 
DataTypes.BYTE_ARRAY) {
+      return getBytes(rowId);
+    } else {
+      throw new RuntimeException("unsupported data type: " + dataType);
+    }
+  }
+
+  /**
    * Set byte value at rowId
    */
   public abstract void putByte(int rowId, byte value);
@@ -446,6 +478,31 @@ public abstract class ColumnPage {
   }
 
   /**
+   * Get null at rowId
+   */
+  private Object getNull(int rowId) {
+    Object result;
+    if (dataType == DataTypes.BOOLEAN) {
+      result = getBoolean(rowId);
+    } else if (dataType == DataTypes.BYTE) {
+      result = getByte(rowId);
+    } else if (dataType == DataTypes.SHORT) {
+      result = getShort(rowId);
+    } else if (dataType == DataTypes.INT) {
+      result = getInt(rowId);
+    } else if (dataType == DataTypes.LONG) {
+      result = getLong(rowId);
+    } else if (dataType == DataTypes.DOUBLE) {
+      result = getDouble(rowId);
+    } else if (DataTypes.isDecimal(dataType)) {
+      result = getDecimal(rowId);
+    } else {
+      throw new IllegalArgumentException("unsupported data type: " + dataType);
+    }
+    return result;
+  }
+
+  /**
    * Get byte value at rowId
    */
   public abstract byte getByte(int rowId);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ecd6c0c5/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
----------------------------------------------------------------------
diff --git 
a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
 
b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
index e116825..d2dbaa5 100644
--- 
a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
+++ 
b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.datamap.examples;
 
 import java.io.BufferedWriter;
 import java.io.DataOutputStream;
+import java.io.File;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
 import java.util.ArrayList;
@@ -33,39 +34,63 @@ import org.apache.carbondata.core.datamap.dev.DataMapWriter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonMetadata;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.core.util.CarbonUtil;
 
 import com.google.gson.Gson;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 
 public class MinMaxDataWriter extends DataMapWriter {
 
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(TableInfo.class.getName());
 
-  private byte[][] pageLevelMin, pageLevelMax;
-
-  private byte[][] blockletLevelMin, blockletLevelMax;
+  private Object[] pageLevelMin, pageLevelMax;
 
   private Map<Integer, BlockletMinMax> blockMinMaxMap;
 
-  private String dataWritePath;
+  private String dataMapName;
+  private int columnCnt;
+  private DataType[] dataTypeArray;
 
-  public MinMaxDataWriter(AbsoluteTableIdentifier identifier, Segment segment,
+  /**
+   * Since the sequence of indexed columns is defined the same as order in 
user-created, so
+   * map colIdx in user-created to colIdx in MinMaxIndex.
+   * Please note that the sequence of min-max values for each column in 
blocklet-min-max is not
+   * the same as indexed columns, so we need to reorder the origin while 
writing the min-max values
+   */
+  private Map<Integer, Integer> origin2MinMaxOrdinal = new HashMap<>();
+
+  public MinMaxDataWriter(AbsoluteTableIdentifier identifier, String 
dataMapName, Segment segment,
       String dataWritePath) {
     super(identifier, segment, dataWritePath);
-    this.identifier = identifier;
-    this.segmentId = segment.getSegmentNo();
-    this.dataWritePath = dataWritePath;
+    this.dataMapName = dataMapName;
+    CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(
+        identifier.getDatabaseName(), identifier.getTableName());
+    List<CarbonColumn> cols = 
carbonTable.getCreateOrderColumn(identifier.getTableName());
+    this.columnCnt = cols.size();
+    List<CarbonDimension> dimensions = 
carbonTable.getDimensionByTableName(identifier.getTableName());
+    for (int i = 0; i < dimensions.size(); i++) {
+      this.origin2MinMaxOrdinal.put(dimensions.get(i).getSchemaOrdinal(),
+          dimensions.get(i).getOrdinal());
+    }
+    List<CarbonMeasure> measures = 
carbonTable.getMeasureByTableName(identifier.getTableName());
+    for (int i = 0; i < measures.size(); i++) {
+      this.origin2MinMaxOrdinal.put(measures.get(i).getSchemaOrdinal(),
+          dimensions.size() + measures.get(i).getOrdinal());
+    }
   }
 
   @Override public void onBlockStart(String blockId) {
-    pageLevelMax = null;
-    pageLevelMin = null;
-    blockletLevelMax = null;
-    blockletLevelMin = null;
-    blockMinMaxMap = null;
     blockMinMaxMap = new HashMap<Integer, BlockletMinMax>();
   }
 
@@ -74,10 +99,12 @@ public class MinMaxDataWriter extends DataMapWriter {
   }
 
   @Override public void onBlockletStart(int blockletId) {
+    pageLevelMin = new Object[columnCnt];
+    pageLevelMax = new Object[columnCnt];
   }
 
   @Override public void onBlockletEnd(int blockletId) {
-    updateBlockletMinMax(blockletId);
+    updateCurrentBlockletMinMax(blockletId);
   }
 
   @Override
@@ -106,48 +133,99 @@ public class MinMaxDataWriter extends DataMapWriter {
     //          pages[0].getStatistics().getMax());
     //    }
 
-    byte[] value = new byte[pages[0].getBytes(0).length - 2];
-    if (pageLevelMin == null && pageLevelMax == null) {
-      pageLevelMin = new byte[2][];
-      pageLevelMax = new byte[2][];
-
-      System.arraycopy(pages[0].getBytes(0), 2, value, 0, value.length);
-      pageLevelMin[1] = value;
-      pageLevelMax[1] = value;
+    if (this.dataTypeArray == null) {
+      this.dataTypeArray = new DataType[this.columnCnt];
+      for (int i = 0; i < this.columnCnt; i++) {
+        this.dataTypeArray[i] = pages[i].getDataType();
+      }
+    }
 
-    } else {
-      for (int rowIndex = 0; rowIndex < pages[0].getPageSize(); rowIndex++) {
-        System.arraycopy(pages[0].getBytes(rowIndex), 2, value, 0, 
value.length);
-        if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(pageLevelMin[1], value) 
> 0) {
-          pageLevelMin[1] = value;
-        }
-        if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(pageLevelMax[1], value) 
< 0) {
-          pageLevelMax[1] = value;
+    // as an example, we don't use page-level min-max generated by native 
carbondata here, we get
+    // the min-max by comparing each row
+    for (int rowId = 0; rowId < pages[0].getPageSize(); rowId++) {
+      for (int colIdx = 0; colIdx < columnCnt; colIdx++) {
+        Object originValue = pages[colIdx].getData(rowId);
+        // for string & bytes_array, data is prefixed with length, need to 
remove it
+        if (DataTypes.STRING == pages[colIdx].getDataType()
+            || DataTypes.BYTE_ARRAY == pages[colIdx].getDataType()) {
+          byte[] valueMin0 = (byte[]) pageLevelMin[colIdx];
+          byte[] valueMax0 = (byte[]) pageLevelMax[colIdx];
+          byte[] value1 = (byte[]) originValue;
+          if (pageLevelMin[colIdx] == null || ByteUtil.UnsafeComparer.INSTANCE
+              .compareTo(valueMin0, 0, valueMin0.length, value1, 2, 
value1.length - 2) > 0) {
+            pageLevelMin[colIdx] = new byte[value1.length - 2];
+            System.arraycopy(value1, 2, (byte[]) pageLevelMin[colIdx], 0, 
value1.length - 2);
+          }
+          if (pageLevelMax[colIdx] == null || ByteUtil.UnsafeComparer.INSTANCE
+              .compareTo(valueMax0, 0, valueMax0.length, value1, 2, 
value1.length - 2) < 0) {
+            pageLevelMax[colIdx] = new byte[value1.length - 2];
+            System.arraycopy(value1, 2, (byte[]) pageLevelMax[colIdx], 0, 
value1.length - 2);
+          }
+        } else if (DataTypes.INT == pages[colIdx].getDataType()) {
+          updateMinMax(colIdx, originValue, pages[colIdx].getDataType());
+        } else {
+          throw new RuntimeException("Not implement yet");
         }
       }
     }
   }
 
-  private void updateBlockletMinMax(int blockletId) {
-    if (blockletLevelMax == null || blockletLevelMin == null) {
-      blockletLevelMax = new byte[2][];
-      blockletLevelMin = new byte[2][];
-      if (pageLevelMax != null || pageLevelMin != null) {
-        blockletLevelMin = pageLevelMin;
-        blockletLevelMax = pageLevelMax;
+  private void updateMinMax(int colIdx, Object originValue, DataType dataType) 
{
+    if (pageLevelMin[colIdx] == null) {
+      pageLevelMin[colIdx] = originValue;
+    }
+    if (pageLevelMax[colIdx] == null) {
+      pageLevelMax[colIdx] = originValue;
+    }
+
+    if (DataTypes.SHORT == dataType) {
+      if (pageLevelMin[colIdx] == null || (short) pageLevelMin[colIdx] - 
(short) originValue > 0) {
+        pageLevelMin[colIdx] = originValue;
       }
-    } else {
-      if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(blockletLevelMin[1], 
pageLevelMin[1]) > 0) {
-        blockletLevelMin = pageLevelMin;
+      if (pageLevelMax[colIdx] == null || (short) pageLevelMax[colIdx] - 
(short) originValue < 0) {
+        pageLevelMax[colIdx] = originValue;
       }
-
-      if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(blockletLevelMax[1], 
pageLevelMax[1]) > 0) {
-        blockletLevelMax = pageLevelMax;
+    } else if (DataTypes.INT == dataType) {
+      if (pageLevelMin[colIdx] == null || (int) pageLevelMin[colIdx] - (int) 
originValue > 0) {
+        pageLevelMin[colIdx] = originValue;
+      }
+      if (pageLevelMax[colIdx] == null || (int) pageLevelMax[colIdx] - (int) 
originValue < 0) {
+        pageLevelMax[colIdx] = originValue;
       }
+    } else if (DataTypes.LONG == dataType) {
+      if (pageLevelMin[colIdx] == null || (long) pageLevelMin[colIdx] - (long) 
originValue > 0) {
+        pageLevelMin[colIdx] = originValue;
+      }
+      if (pageLevelMax[colIdx] == null || (long) pageLevelMax[colIdx] - (long) 
originValue < 0) {
+        pageLevelMax[colIdx] = originValue;
+      }
+    } else if (DataTypes.DOUBLE == dataType) {
+      if (pageLevelMin[colIdx] == null
+          || (double) pageLevelMin[colIdx] - (double) originValue > 0) {
+        pageLevelMin[colIdx] = originValue;
+      }
+      if (pageLevelMax[colIdx] == null
+          || (double) pageLevelMax[colIdx] - (double) originValue < 0) {
+        pageLevelMax[colIdx] = originValue;
+      }
+    } else {
+      // todo:
+      throw new RuntimeException("Not implemented yet");
     }
+  }
+
+  private void updateCurrentBlockletMinMax(int blockletId) {
+    byte[][] max = new byte[this.columnCnt][];
+    byte[][] min = new byte[this.columnCnt][];
+    for (int i = 0; i < this.columnCnt; i++) {
+      int targetColIdx = origin2MinMaxOrdinal.get(i);
+      max[targetColIdx] = CarbonUtil.getValueAsBytes(this.dataTypeArray[i], 
pageLevelMax[i]);
+      min[targetColIdx] = CarbonUtil.getValueAsBytes(this.dataTypeArray[i], 
pageLevelMin[i]);
+    }
+
     BlockletMinMax blockletMinMax = new BlockletMinMax();
-    blockletMinMax.setMax(blockletLevelMax);
-    blockletMinMax.setMin(blockletLevelMin);
+    blockletMinMax.setMax(max);
+    blockletMinMax.setMin(min);
     blockMinMaxMap.put(blockletId, blockletMinMax);
   }
 
@@ -156,8 +234,6 @@ public class MinMaxDataWriter extends DataMapWriter {
     constructMinMaxIndex(blockId);
   }
 
-
-
   /**
    * Construct the Min Max Index.
    * @param blockId
@@ -178,9 +254,9 @@ public class MinMaxDataWriter extends DataMapWriter {
    */
   private List<MinMaxIndexBlockDetails> loadBlockDetails() {
     List<MinMaxIndexBlockDetails> minMaxIndexBlockDetails = new 
ArrayList<MinMaxIndexBlockDetails>();
-    MinMaxIndexBlockDetails tmpminMaxIndexBlockDetails = new 
MinMaxIndexBlockDetails();
 
     for (int index = 0; index < blockMinMaxMap.size(); index++) {
+      MinMaxIndexBlockDetails tmpminMaxIndexBlockDetails = new 
MinMaxIndexBlockDetails();
       
tmpminMaxIndexBlockDetails.setMinValues(blockMinMaxMap.get(index).getMin());
       
tmpminMaxIndexBlockDetails.setMaxValues(blockMinMaxMap.get(index).getMax());
       tmpminMaxIndexBlockDetails.setBlockletId(index);
@@ -197,7 +273,8 @@ public class MinMaxDataWriter extends DataMapWriter {
    */
   public void writeMinMaxIndexFile(List<MinMaxIndexBlockDetails> 
minMaxIndexBlockDetails,
       String blockId) throws IOException {
-    String filePath = dataWritePath +"/" + blockId + ".minmaxindex";
+    String dataMapDir = genDataMapStorePath(this.writeDirectoryPath, 
this.dataMapName);
+    String filePath = dataMapDir + File.separator + blockId + ".minmaxindex";
     BufferedWriter brWriter = null;
     DataOutputStream dataOutStream = null;
     try {
@@ -209,6 +286,7 @@ public class MinMaxDataWriter extends DataMapWriter {
       brWriter.write(minmaxIndexData);
     } catch (IOException ioe) {
       LOGGER.info("Error in writing minMaxindex file");
+      throw ioe;
     } finally {
       if (null != brWriter) {
         brWriter.flush();
@@ -224,4 +302,23 @@ public class MinMaxDataWriter extends DataMapWriter {
   @Override public void finish() throws IOException {
 
   }
+
+  /**
+   * create and return path that will store the datamap
+   *
+   * @param dataPath patch to store the carbondata factdata
+   * @param dataMapName datamap name
+   * @return path to store the datamap
+   * @throws IOException
+   */
+  public static String genDataMapStorePath(String dataPath, String dataMapName)
+      throws IOException {
+    String dmDir = dataPath + File.separator + dataMapName;
+    Path dmPath = FileFactory.getPath(dmDir);
+    FileSystem fs = FileFactory.getFileSystem(dmPath);
+    if (!fs.exists(dmPath)) {
+      fs.mkdirs(dmPath);
+    }
+    return dmDir;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ecd6c0c5/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java
----------------------------------------------------------------------
diff --git 
a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java
 
b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java
index bad22b2..ac6358e 100644
--- 
a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java
+++ 
b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMap.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.datamap.examples;
 
 import java.io.BufferedReader;
 import java.io.DataInputStream;
+import java.io.File;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.util.ArrayList;
@@ -30,14 +31,11 @@ import 
org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.datamap.dev.DataMapModel;
 import org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
 import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
 import org.apache.carbondata.core.indexstore.Blocklet;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
-import org.apache.carbondata.core.indexstore.row.DataMapRow;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.scan.filter.FilterUtil;
 import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
@@ -45,6 +43,10 @@ import 
org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 import org.apache.carbondata.core.util.CarbonUtil;
 
 import com.google.gson.Gson;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 
 /**
  * Datamap implementation for min max blocklet.
@@ -54,29 +56,36 @@ public class MinMaxIndexDataMap extends CoarseGrainDataMap {
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(MinMaxIndexDataMap.class.getName());
 
-  public static final String NAME = "clustered.minmax.btree.blocklet";
+  private String[] indexFilePath;
 
-  private String filePath;
-
-  private MinMaxIndexBlockDetails[] readMinMaxDataMap;
+  private MinMaxIndexBlockDetails[][] readMinMaxDataMap;
 
   @Override
   public void init(DataMapModel model) throws MemoryException, IOException {
-    this.filePath = model.getFilePath();
-    CarbonFile[] listFiles = getCarbonMinMaxIndexFiles(filePath, "0");
-    for (int i = 0; i < listFiles.length; i++) {
-      readMinMaxDataMap = readJson(listFiles[i].getPath());
+    Path indexPath = FileFactory.getPath(model.getFilePath());
+
+    FileSystem fs = FileFactory.getFileSystem(indexPath);
+    if (!fs.exists(indexPath)) {
+      throw new IOException(
+          String.format("Path %s for MinMax index dataMap does not exist", 
indexPath));
+    }
+    if (!fs.isDirectory(indexPath)) {
+      throw new IOException(
+          String.format("Path %s for MinMax index dataMap must be a 
directory", indexPath));
     }
-  }
 
-  private CarbonFile[] getCarbonMinMaxIndexFiles(String filePath, String 
segmentId) {
-    String path = filePath.substring(0, filePath.lastIndexOf("/") + 1);
-    CarbonFile carbonFile = FileFactory.getCarbonFile(path);
-    return carbonFile.listFiles(new CarbonFileFilter() {
-      @Override public boolean accept(CarbonFile file) {
-        return file.getName().endsWith(".minmaxindex");
+    FileStatus[] indexFileStatus = fs.listStatus(indexPath, new PathFilter() {
+      @Override public boolean accept(Path path) {
+        return path.getName().endsWith(".minmaxindex");
       }
     });
+
+    this.indexFilePath = new String[indexFileStatus.length];
+    this.readMinMaxDataMap = new 
MinMaxIndexBlockDetails[indexFileStatus.length][];
+    for (int i = 0; i < indexFileStatus.length; i++) {
+      this.indexFilePath[i] = indexFileStatus[i].getPath().toString();
+      this.readMinMaxDataMap[i] = readJson(this.indexFilePath[i]);
+    }
   }
 
   private MinMaxIndexBlockDetails[] readJson(String filePath) {
@@ -118,20 +127,34 @@ public class MinMaxIndexDataMap extends 
CoarseGrainDataMap {
 
     if (filterExp == null) {
       for (int i = 0; i < readMinMaxDataMap.length; i++) {
-        blocklets.add(new Blocklet(filePath, 
String.valueOf(readMinMaxDataMap[i].getBlockletId())));
+        for (int j = 0; j < readMinMaxDataMap[i].length; j++) {
+          blocklets.add(new Blocklet(indexFilePath[i],
+              String.valueOf(readMinMaxDataMap[i][j].getBlockletId())));
+        }
       }
     } else {
       FilterExecuter filterExecuter =
           FilterUtil.getFilterExecuterTree(filterExp, segmentProperties, null);
-      int startIndex = 0;
-      while (startIndex < readMinMaxDataMap.length) {
-        BitSet bitSet = 
filterExecuter.isScanRequired(readMinMaxDataMap[startIndex].getMaxValues(),
-            readMinMaxDataMap[startIndex].getMinValues());
-        if (!bitSet.isEmpty()) {
-          blocklets.add(new Blocklet(filePath,
-              String.valueOf(readMinMaxDataMap[startIndex].getBlockletId())));
+      for (int blkIdx = 0; blkIdx < readMinMaxDataMap.length; blkIdx++) {
+        for (int blkltIdx = 0; blkltIdx < readMinMaxDataMap[blkIdx].length; 
blkltIdx++) {
+
+          BitSet bitSet = filterExecuter.isScanRequired(
+              readMinMaxDataMap[blkIdx][blkltIdx].getMaxValues(),
+              readMinMaxDataMap[blkIdx][blkltIdx].getMinValues());
+          if (!bitSet.isEmpty()) {
+            String blockFileName = indexFilePath[blkIdx].substring(
+                indexFilePath[blkIdx].lastIndexOf(File.separatorChar) + 1,
+                indexFilePath[blkIdx].indexOf(".minmaxindex"));
+            Blocklet blocklet = new Blocklet(blockFileName,
+                
String.valueOf(readMinMaxDataMap[blkIdx][blkltIdx].getBlockletId()));
+            LOGGER.info(String.format("MinMaxDataMap: Need to scan block#%s -> 
blocklet#%s, %s",
+                blkIdx, blkltIdx, blocklet));
+            blocklets.add(blocklet);
+          } else {
+            LOGGER.info(String.format("MinMaxDataMap: Skip scan block#%s -> 
blocklet#%s",
+                blkIdx, blkltIdx));
+          }
         }
-        startIndex++;
       }
     }
     return blocklets;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ecd6c0c5/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
----------------------------------------------------------------------
diff --git 
a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
 
b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
index a2f92c9..758a67c 100644
--- 
a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
+++ 
b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxIndexDataMapFactory.java
@@ -19,9 +19,11 @@ package org.apache.carbondata.datamap.examples;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 
+import 
org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.datamap.DataMapDistributable;
 import org.apache.carbondata.core.datamap.DataMapMeta;
 import org.apache.carbondata.core.datamap.Segment;
@@ -31,21 +33,61 @@ import 
org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMap;
 import 
org.apache.carbondata.core.datamap.dev.cgdatamap.CoarseGrainDataMapFactory;
 import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonMetadata;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.readcommitter.ReadCommittedScope;
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.events.Event;
 
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.Transformer;
+import org.apache.commons.lang3.StringUtils;
+
 /**
  * Min Max DataMap Factory
  */
 public class MinMaxIndexDataMapFactory extends CoarseGrainDataMapFactory {
-
+  private static final LogService LOGGER = LogServiceFactory.getLogService(
+      MinMaxIndexDataMapFactory.class.getName());
+  private DataMapMeta dataMapMeta;
+  private String dataMapName;
   private AbsoluteTableIdentifier identifier;
 
-  @Override public void init(AbsoluteTableIdentifier identifier, DataMapSchema 
dataMapSchema) {
+  // this is an example for datamap, we can choose the columns and operations 
that
+  // will be supported by this datamap. Furthermore, we can add cache-support 
for this datamap.
+  @Override public void init(AbsoluteTableIdentifier identifier, DataMapSchema 
dataMapSchema)
+      throws IOException, MalformedDataMapCommandException {
     this.identifier = identifier;
+    this.dataMapName = dataMapSchema.getDataMapName();
+
+    String tableUniqueName = 
identifier.getCarbonTableIdentifier().getTableUniqueName();
+    CarbonTable carbonTable = 
CarbonMetadata.getInstance().getCarbonTable(tableUniqueName);
+    if (null == carbonTable) {
+      throw new IOException("Failed to get carbon table with name " + 
tableUniqueName);
+    }
+
+    // columns that will be indexed
+    List<CarbonColumn> allColumns = 
carbonTable.getCreateOrderColumn(identifier.getTableName());
+    List<String> minMaxCols = (List) CollectionUtils.collect(allColumns, new 
Transformer() {
+      @Override public Object transform(Object o) {
+        return ((CarbonColumn) o).getColName();
+      }
+    });
+    LOGGER.info("MinMaxDataMap support index columns: " + 
StringUtils.join(minMaxCols, ", "));
+
+    // operations that will be supported on the indexed columns
+    List<ExpressionType> optOperations = new ArrayList<>();
+    optOperations.add(ExpressionType.EQUALS);
+    optOperations.add(ExpressionType.GREATERTHAN);
+    optOperations.add(ExpressionType.GREATERTHAN_EQUALTO);
+    optOperations.add(ExpressionType.LESSTHAN);
+    optOperations.add(ExpressionType.LESSTHAN_EQUALTO);
+    optOperations.add(ExpressionType.NOT_EQUALS);
+    LOGGER.error("MinMaxDataMap support operations: " + 
StringUtils.join(optOperations, ", "));
+    this.dataMapMeta = new DataMapMeta(minMaxCols, optOperations);
   }
 
   /**
@@ -55,7 +97,7 @@ public class MinMaxIndexDataMapFactory extends 
CoarseGrainDataMapFactory {
    * @return
    */
   @Override public DataMapWriter createWriter(Segment segment, String 
writeDirectoryPath) {
-    return new MinMaxDataWriter(identifier, segment, writeDirectoryPath);
+    return new MinMaxDataWriter(identifier, dataMapName, segment, 
writeDirectoryPath);
   }
 
   /**
@@ -75,7 +117,10 @@ public class MinMaxIndexDataMapFactory extends 
CoarseGrainDataMapFactory {
     MinMaxIndexDataMap dataMap = new MinMaxIndexDataMap();
     try {
       dataMap.init(new DataMapModel(
-          CarbonTablePath.getSegmentPath(identifier.getTablePath(), 
segment.getSegmentNo())));
+          MinMaxDataWriter.genDataMapStorePath(
+              CarbonTablePath.getSegmentPath(
+                  identifier.getTablePath(), segment.getSegmentNo()),
+              dataMapName)));
     } catch (MemoryException ex) {
       throw new IOException(ex);
     }
@@ -108,7 +153,7 @@ public class MinMaxIndexDataMapFactory extends 
CoarseGrainDataMapFactory {
   @Override public List<CoarseGrainDataMap> getDataMaps(DataMapDistributable 
distributable,
       ReadCommittedScope readCommittedScope)
       throws IOException {
-    return null;
+    return getDataMaps(distributable.getSegment(), readCommittedScope);
   }
 
   @Override public void fireEvent(Event event) {
@@ -116,7 +161,6 @@ public class MinMaxIndexDataMapFactory extends 
CoarseGrainDataMapFactory {
   }
 
   @Override public DataMapMeta getMeta() {
-    return new DataMapMeta(new ArrayList<String>(Arrays.asList("c2")),
-        new ArrayList<ExpressionType>());
+    return this.dataMapMeta;
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ecd6c0c5/datamap/examples/src/minmaxdatamap/test/scala/minmaxdatamaptestcase/MinMaxDataMapExample.scala
----------------------------------------------------------------------
diff --git 
a/datamap/examples/src/minmaxdatamap/test/scala/minmaxdatamaptestcase/MinMaxDataMapExample.scala
 
b/datamap/examples/src/minmaxdatamap/test/scala/minmaxdatamaptestcase/MinMaxDataMapExample.scala
deleted file mode 100644
index 59872aa..0000000
--- 
a/datamap/examples/src/minmaxdatamap/test/scala/minmaxdatamaptestcase/MinMaxDataMapExample.scala
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.datamap.examples
-
-import java.io.File
-
-import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datamap.DataMapStoreManager
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.core.util.CarbonProperties
-
-object MinMaxDataMapExample {
-  def main(args: Array[String]): Unit = {
-
-    val rootPath = new File(this.getClass.getResource("/").getPath
-                            + "").getCanonicalPath
-    val storeLocation = s"$rootPath/dataMap/examples/target/store"
-    val warehouse = s"$rootPath/datamap/examples/target/warehouse"
-    val metastoredb = s"$rootPath/datamap/examples/target"
-
-    CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
-
-    import org.apache.spark.sql.CarbonSession._
-
-    val spark = SparkSession
-      .builder()
-      .master("local")
-      .appName("CarbonDataMapExample")
-      .config("spark.sql.warehouse.dir", warehouse)
-      .getOrCreateCarbonSession(storeLocation)
-
-    spark.sparkContext.setLogLevel("ERROR")
-    import spark.implicits._
-
-    // register datamap writer
-    DataMapStoreManager.getInstance().createAndRegisterDataMap(
-      AbsoluteTableIdentifier.from(storeLocation, "default", "carbonminmax"),
-      classOf[MinMaxIndexDataMapFactory].getName,
-      MinMaxIndexDataMap.NAME)
-
-    spark.sql("DROP TABLE IF EXISTS carbonminmax")
-
-    val df = spark.sparkContext.parallelize(1 to 33000)
-      .map(x => ("a", "b", x))
-      .toDF("c1", "c2", "c3")
-
-    // save dataframe to carbon file
-    df.write
-      .format("carbondata")
-      .option("tableName", "carbonminmax")
-      .mode(SaveMode.Overwrite)
-      .save()
-
-    // Query the table.
-    spark.sql("select c2 from carbonminmax").show(20, false)
-    spark.sql("select c2 from carbonminmax where c2 = 'b'").show(20, false)
-    spark.sql("DROP TABLE IF EXISTS carbonminmax")
-
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ecd6c0c5/datamap/examples/src/minmaxdatamap/test/scala/org/apache/carbondata/datamap/examples/MinMaxDataMapSuite.scala
----------------------------------------------------------------------
diff --git 
a/datamap/examples/src/minmaxdatamap/test/scala/org/apache/carbondata/datamap/examples/MinMaxDataMapSuite.scala
 
b/datamap/examples/src/minmaxdatamap/test/scala/org/apache/carbondata/datamap/examples/MinMaxDataMapSuite.scala
new file mode 100644
index 0000000..429dac2
--- /dev/null
+++ 
b/datamap/examples/src/minmaxdatamap/test/scala/org/apache/carbondata/datamap/examples/MinMaxDataMapSuite.scala
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.datamap.examples
+
+import java.io.{File, PrintWriter}
+import java.util.UUID
+
+import scala.util.Random
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+class MinMaxDataMapSuite extends QueryTest with BeforeAndAfterAll {
+  val inputFile = s"$resourcesPath/minmax_datamap_input.csv"
+  val normalTable = "carbonNormal"
+  val minMaxDMSampleTable = "carbonMinMax"
+  val dataMapName = "minmax_dm"
+  val lineNum = 500000
+
+  override protected def beforeAll(): Unit = {
+    createFile(inputFile, line = lineNum, start = 0)
+    sql(s"DROP TABLE IF EXISTS $normalTable")
+    sql(s"DROP TABLE IF EXISTS $minMaxDMSampleTable")
+  }
+  
+  test("test minmax datamap") {
+    sql(
+      s"""
+         | CREATE TABLE $normalTable(id INT, name STRING, city STRING, age INT,
+         | s1 STRING, s2 STRING, s3 STRING, s4 STRING, s5 STRING, s6 STRING, 
s7 STRING, s8 STRING)
+         | STORED BY 'carbondata' TBLPROPERTIES('table_blocksize'='128')
+         |  """.stripMargin)
+    sql(
+      s"""
+        | CREATE TABLE $minMaxDMSampleTable(id INT, name STRING, city STRING, 
age INT,
+        | s1 STRING, s2 STRING, s3 STRING, s4 STRING, s5 STRING, s6 STRING, s7 
STRING, s8 STRING)
+        | STORED BY 'carbondata' TBLPROPERTIES('table_blocksize'='128')
+        |  """.stripMargin)
+    sql(
+      s"""
+        | CREATE DATAMAP $dataMapName ON TABLE $minMaxDMSampleTable
+        | USING '${classOf[MinMaxIndexDataMapFactory].getName}'
+      """.stripMargin)
+
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$inputFile' INTO TABLE $minMaxDMSampleTable
+         | OPTIONS('header'='false')
+       """.stripMargin)
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$normalTable' INTO TABLE 
$minMaxDMSampleTable
+         | OPTIONS('header'='false')
+       """.stripMargin)
+
+    sql(s"show datamap on table $minMaxDMSampleTable").show(false)
+    checkAnswer(sql(s"show datamap on table $minMaxDMSampleTable"),
+      Row(dataMapName, classOf[MinMaxIndexDataMapFactory].getName, "(NA)"))
+    // not that the table will use default dimension as sort_columns, so for 
the following cases,
+    // the pruning result will differ.
+    // 1 blocklet
+    checkAnswer(sql(s"select * from $minMaxDMSampleTable where id = 1"),
+      sql(s"select * from $normalTable where id = 1"))
+    // 6 blocklet
+    checkAnswer(sql(s"select * from $minMaxDMSampleTable where id = 999"),
+      sql(s"select * from $normalTable where id = 999"))
+    // 1 blocklet
+    checkAnswer(sql(s"select * from $minMaxDMSampleTable where city = 
'city_1'"),
+      sql(s"select * from $normalTable where city = 'city_1'"))
+    // 1 blocklet
+    checkAnswer(sql(s"select * from $minMaxDMSampleTable where city = 
'city_999'"),
+      sql(s"select * from $normalTable where city = 'city_999'"))
+    // 6 blocklet
+    checkAnswer(sql(s"select count(distinct id), count(distinct name), 
count(distinct city)," +
+                    s" count(distinct s1), count(distinct s2) from 
$minMaxDMSampleTable"),
+      sql(s"select count(distinct id), count(distinct name), count(distinct 
city)," +
+          s" count(distinct s1), count(distinct s2) from $normalTable"))
+    // 6 blocklet
+    checkAnswer(sql(s"select min(id), max(id), min(name), max(name), 
min(city), max(city)" +
+                    s" from $minMaxDMSampleTable"),
+      sql(s"select min(id), max(id), min(name), max(name), min(city), 
max(city)" +
+          s" from $normalTable"))
+  }
+
+  override protected def afterAll(): Unit = {
+    deleteFile(inputFile)
+    sql(s"DROP TABLE IF EXISTS $normalTable")
+    sql(s"DROP TABLE IF EXISTS $minMaxDMSampleTable")
+  }
+
+  private def createFile(fileName: String, line: Int = 10000, start: Int = 0) 
= {
+    if (!new File(fileName).exists()) {
+      val write = new PrintWriter(new File(fileName))
+      for (i <- start until (start + line)) {
+        write.println(
+          s"$i,n$i,city_$i,${ Random.nextInt(80) }," +
+          s"${ UUID.randomUUID().toString },${ UUID.randomUUID().toString }," +
+          s"${ UUID.randomUUID().toString },${ UUID.randomUUID().toString }," +
+          s"${ UUID.randomUUID().toString },${ UUID.randomUUID().toString }," +
+          s"${ UUID.randomUUID().toString },${ UUID.randomUUID().toString }")
+      }
+      write.close()
+    }
+  }
+
+  private def deleteFile(fileName: String): Unit = {
+    val file = new File(fileName)
+    if (file.exists()) {
+      file.delete()
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ecd6c0c5/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
----------------------------------------------------------------------
diff --git 
a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
 
b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
index 86b2382..4286e5a 100644
--- 
a/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
+++ 
b/datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
@@ -114,7 +114,7 @@ public class LuceneDataMapWriter extends DataMapWriter {
     FileSystem fs = FileFactory.getFileSystem(indexPath);
 
     // if index path not exists, create it
-    if (fs.exists(indexPath)) {
+    if (!fs.exists(indexPath)) {
       fs.mkdirs(indexPath);
     }
 

Reply via email to