Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2254#discussion_r185434672
--- Diff:
datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapWriter.java
---
@@ -21,142 +21,124 @@
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datamap.DataMapMeta;
import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datamap.dev.DataMapWriter;
import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.datastore.page.ColumnPage;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import
org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
import org.apache.carbondata.core.util.CarbonUtil;
import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;
/**
- * BloomDataMap is constructed in blocklet level. For each indexed column,
a bloom filter is
- * constructed to indicate whether a value belongs to this blocklet. Bloom
filter of blocklet that
- * belongs to same block will be written to one index file suffixed with
.bloomindex. So the number
+ * BloomDataMap is constructed in CG level (blocklet level).
+ * For each indexed column, a bloom filter is constructed to indicate
whether a value
+ * belongs to this blocklet. Bloom filter of blocklet that belongs to same
block will
+ * be written to one index file suffixed with .bloomindex. So the number
* of bloom index file will be equal to that of the blocks.
*/
@InterfaceAudience.Internal
public class BloomDataMapWriter extends DataMapWriter {
- private String dataMapName;
- private List<String> indexedColumns;
+ private static final LogService LOG = LogServiceFactory.getLogService(
+ BloomDataMapWriter.class.getCanonicalName());
private int bloomFilterSize;
- // map column name to ordinal in pages
- private Map<String, Integer> col2Ordianl;
- private Map<String, DataType> col2DataType;
- private String indexShardName;
- private int currentBlockletId;
+ protected int currentBlockletId;
private List<String> currentDMFiles;
private List<DataOutputStream> currentDataOutStreams;
private List<ObjectOutputStream> currentObjectOutStreams;
private List<BloomFilter<byte[]>> indexBloomFilters;
- @InterfaceAudience.Internal
- public BloomDataMapWriter(AbsoluteTableIdentifier identifier,
DataMapMeta dataMapMeta,
- int bloomFilterSize, Segment segment, String writeDirectoryPath) {
- super(identifier, segment, writeDirectoryPath);
- dataMapName = dataMapMeta.getDataMapName();
- indexedColumns = dataMapMeta.getIndexedColumns();
+ BloomDataMapWriter(String tablePath, String dataMapName,
List<CarbonColumn> indexColumns,
+ Segment segment, String shardName, int bloomFilterSize) throws
IOException {
+ super(tablePath, dataMapName, indexColumns, segment, shardName);
this.bloomFilterSize = bloomFilterSize;
- col2Ordianl = new HashMap<String, Integer>(indexedColumns.size());
- col2DataType = new HashMap<String, DataType>(indexedColumns.size());
- currentDMFiles = new ArrayList<String>(indexedColumns.size());
- currentDataOutStreams = new
ArrayList<DataOutputStream>(indexedColumns.size());
- currentObjectOutStreams = new
ArrayList<ObjectOutputStream>(indexedColumns.size());
-
- indexBloomFilters = new
ArrayList<BloomFilter<byte[]>>(indexedColumns.size());
+ currentDMFiles = new ArrayList<String>(indexColumns.size());
+ currentDataOutStreams = new
ArrayList<DataOutputStream>(indexColumns.size());
+ currentObjectOutStreams = new
ArrayList<ObjectOutputStream>(indexColumns.size());
+ indexBloomFilters = new
ArrayList<BloomFilter<byte[]>>(indexColumns.size());
+ initDataMapFile();
+ resetBloomFilters();
}
@Override
- public void onBlockStart(String blockId, String indexShardName) throws
IOException {
- if (this.indexShardName == null) {
- this.indexShardName = indexShardName;
- initDataMapFile();
- }
+ public void onBlockStart(String blockId) throws IOException {
}
@Override
public void onBlockEnd(String blockId) throws IOException {
-
}
@Override
public void onBlockletStart(int blockletId) {
- this.currentBlockletId = blockletId;
+ }
+
+ protected void resetBloomFilters() {
indexBloomFilters.clear();
- for (int i = 0; i < indexedColumns.size(); i++) {
+ List<CarbonColumn> indexColumns = getIndexColumns();
+ for (int i = 0; i < indexColumns.size(); i++) {
indexBloomFilters.add(BloomFilter.create(Funnels.byteArrayFunnel(),
bloomFilterSize, 0.00001d));
}
}
@Override
public void onBlockletEnd(int blockletId) {
- try {
- writeBloomDataMapFile();
- } catch (Exception e) {
- for (ObjectOutputStream objectOutputStream :
currentObjectOutStreams) {
- CarbonUtil.closeStreams(objectOutputStream);
- }
- for (DataOutputStream dataOutputStream : currentDataOutStreams) {
- CarbonUtil.closeStreams(dataOutputStream);
- }
- throw new RuntimeException(e);
- }
+ writeBloomDataMapFile();
+ currentBlockletId++;
}
- // notice that the input pages only contains the indexed columns
@Override
- public void onPageAdded(int blockletId, int pageId, ColumnPage[] pages)
- throws IOException {
- col2Ordianl.clear();
- col2DataType.clear();
- for (int colId = 0; colId < pages.length; colId++) {
- String columnName =
pages[colId].getColumnSpec().getFieldName().toLowerCase();
- col2Ordianl.put(columnName, colId);
- DataType columnType =
pages[colId].getColumnSpec().getSchemaDataType();
- col2DataType.put(columnName, columnType);
- }
+ public void addRow(int blockletId, int pageId, int rowId, CarbonRow row)
{
+ addRow(row.getData());
+ }
- // for each row
- for (int rowId = 0; rowId < pages[0].getPageSize(); rowId++) {
- // for each indexed column
- for (int indexColId = 0; indexColId < indexedColumns.size();
indexColId++) {
- String indexedCol = indexedColumns.get(indexColId);
- byte[] indexValue;
- if (DataTypes.STRING == col2DataType.get(indexedCol)
- || DataTypes.BYTE_ARRAY == col2DataType.get(indexedCol)) {
- byte[] originValue = (byte[])
pages[col2Ordianl.get(indexedCol)].getData(rowId);
- indexValue = new byte[originValue.length - 2];
- System.arraycopy(originValue, 2, indexValue, 0,
originValue.length - 2);
- } else {
- Object originValue =
pages[col2Ordianl.get(indexedCol)].getData(rowId);
- indexValue =
CarbonUtil.getValueAsBytes(col2DataType.get(indexedCol), originValue);
- }
-
- indexBloomFilters.get(indexColId).put(indexValue);
+ protected void addRow(Object[] rowData) {
+ // for each indexed column, add the data to bloom filter
+ List<CarbonColumn> indexColumns = getIndexColumns();
+ for (int i = 0; i < indexColumns.size(); i++) {
+ DataType dataType = indexColumns.get(i).getDataType();
+ byte[] indexValue;
+ if (DataTypes.STRING == dataType) {
--- End diff --
getStringData need to be override by subclass (BloomDataMapRefresher),
BloomDataMapRefresher will get the row data from
IndexDataMapRefreshRDD.internalCompute, it needs a different implementation
---