Github user xuchuanyin commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2275#discussion_r187769203
--- Diff:
datamap/lucene/src/main/java/org/apache/carbondata/datamap/lucene/LuceneDataMapWriter.java
---
@@ -242,74 +258,216 @@ private boolean addField(Document doc, Object data,
CarbonColumn column, Field.S
if (store == Field.Store.YES) {
doc.add(new StoredField(fieldName, (int) value));
}
- } else if (type == DataTypes.INT) {
+ } else if (key instanceof Integer) {
// int type , use int point to deal with int type
- int value = (int) data;
- doc.add(new IntPoint(fieldName, value));
+ int value = (Integer) key;
+ doc.add(new IntPoint(fieldName, new int[] { value }));
// if need store it , add StoredField
if (store == Field.Store.YES) {
doc.add(new StoredField(fieldName, value));
}
- } else if (type == DataTypes.LONG) {
+ } else if (key instanceof Long) {
// long type , use long point to deal with long type
- long value = (long) data;
- doc.add(new LongPoint(fieldName, value));
+ long value = (Long) key;
+ doc.add(new LongPoint(fieldName, new long[] { value }));
// if need store it , add StoredField
if (store == Field.Store.YES) {
doc.add(new StoredField(fieldName, value));
}
- } else if (type == DataTypes.FLOAT) {
- float value = (float) data;
- doc.add(new FloatPoint(fieldName, value));
+ } else if (key instanceof Float) {
+ float value = (Float) key;
+ doc.add(new FloatPoint(fieldName, new float[] { value }));
if (store == Field.Store.YES) {
doc.add(new FloatPoint(fieldName, value));
}
- } else if (type == DataTypes.DOUBLE) {
- double value = (double) data;
- doc.add(new DoublePoint(fieldName, value));
+ } else if (key instanceof Double) {
+ double value = (Double) key;
+ doc.add(new DoublePoint(fieldName, new double[] { value }));
if (store == Field.Store.YES) {
doc.add(new DoublePoint(fieldName, value));
}
+ } else if (key instanceof String) {
+ String strValue = (String) key;
+ doc.add(new TextField(fieldName, strValue, store));
+ } else if (key instanceof Boolean) {
+ boolean value = (Boolean) key;
+ IntRangeField field = new IntRangeField(fieldName, new int[] { 0 },
new int[] { 1 });
+ field.setIntValue(value ? 1 : 0);
+ doc.add(field);
+ if (store == Field.Store.YES) {
+ doc.add(new StoredField(fieldName, value ? 1 : 0));
+ }
+ }
+ return true;
+ }
+
+ private Object getValue(ColumnPage page, int rowId) {
+
+ //get field type
+ DataType type = page.getColumnSpec().getSchemaDataType();
+ Object value = null;
+ if (type == DataTypes.BYTE) {
+ // byte type , use int range to deal with byte, lucene has no byte
type
+ value = page.getByte(rowId);
+ } else if (type == DataTypes.SHORT) {
+ // short type , use int range to deal with short type, lucene has no
short type
+ value = page.getShort(rowId);
+ } else if (type == DataTypes.INT) {
+ // int type , use int point to deal with int type
+ value = page.getInt(rowId);
+ } else if (type == DataTypes.LONG) {
+ // long type , use long point to deal with long type
+ value = page.getLong(rowId);
+ } else if (type == DataTypes.FLOAT) {
+ value = page.getFloat(rowId);
+ } else if (type == DataTypes.DOUBLE) {
+ value = page.getDouble(rowId);
} else if (type == DataTypes.STRING) {
- byte[] value = (byte[]) data;
+ byte[] bytes = page.getBytes(rowId);
// TODO: how to get string value
- String strValue = null;
try {
- strValue = new String(value, 2, value.length - 2, "UTF-8");
+ value = new String(bytes, 2, bytes.length - 2, "UTF-8");
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
- doc.add(new TextField(fieldName, strValue, store));
} else if (type == DataTypes.DATE) {
throw new RuntimeException("unsupported data type " + type);
} else if (type == DataTypes.TIMESTAMP) {
throw new RuntimeException("unsupported data type " + type);
} else if (type == DataTypes.BOOLEAN) {
- boolean value = (boolean) data;
- IntRangeField field = new IntRangeField(fieldName, new int[] { 0 },
new int[] { 1 });
- field.setIntValue(value ? 1 : 0);
- doc.add(field);
- if (store == Field.Store.YES) {
- doc.add(new StoredField(fieldName, value ? 1 : 0));
- }
+ value = page.getBoolean(rowId);
} else {
LOGGER.error("unsupport data type " + type);
throw new RuntimeException("unsupported data type " + type);
}
- return true;
+ return value;
+ }
+
+ public static void addToCache(LuceneColumnKeys key, int rowId, int
pageId, int blockletId,
+ Map<LuceneColumnKeys, Map<Integer, RoaringBitmap>> cache, ByteBuffer
intBuffer,
+ boolean storeBlockletWise) {
+ Map<Integer, RoaringBitmap> setMap = cache.get(key);
+ if (setMap == null) {
+ setMap = new HashMap<>();
+ cache.put(key, setMap);
+ }
+ int combinKey;
+ if (!storeBlockletWise) {
+ intBuffer.clear();
+ intBuffer.putShort((short) blockletId);
+ intBuffer.putShort((short) pageId);
+ intBuffer.rewind();
+ combinKey = intBuffer.getInt();
+ } else {
+ combinKey = pageId;
+ }
+ RoaringBitmap bitSet = setMap.get(combinKey);
+ if (bitSet == null) {
+ bitSet = new RoaringBitmap();
+ setMap.put(combinKey, bitSet);
+ }
+ bitSet.add(rowId);
+ }
+
+ public static void addData(LuceneColumnKeys key, int rowId, int pageId,
int blockletId,
+ ByteBuffer intBuffer, IndexWriter indexWriter, List<CarbonColumn>
indexCols,
+ boolean storeBlockletWise) throws IOException {
+
+ Document document = new Document();
+ for (int i = 0; i < key.getColValues().length; i++) {
+ addField(document, key.getColValues()[i],
indexCols.get(i).getColName(), Field.Store.NO);
+ }
+ intBuffer.clear();
+ if (storeBlockletWise) {
+ // No need to store blocklet id to it.
+ intBuffer.putShort((short) pageId);
+ intBuffer.putShort((short) rowId);
+ intBuffer.rewind();
+ document.add(new StoredField(ROWID_NAME, intBuffer.getInt()));
+ } else {
+ intBuffer.putShort((short) blockletId);
+ intBuffer.putShort((short) pageId);
+ intBuffer.rewind();
+ document.add(new StoredField(PAGEID_NAME, intBuffer.getInt()));
+ document.add(new StoredField(ROWID_NAME, (short) rowId));
+ }
+ indexWriter.addDocument(document);
+ }
+
+ private void flushCacheIfCan() throws IOException {
+ if (cache.size() > cacheSize) {
+ flushCache(cache, getIndexColumns(), indexWriter, storeBlockletWise);
+ }
+ }
+
+ public static void flushCache(Map<LuceneColumnKeys, Map<Integer,
RoaringBitmap>> cache,
+ List<CarbonColumn> indexCols, IndexWriter indexWriter, boolean
storeBlockletWise)
+ throws IOException {
+ for (Map.Entry<LuceneColumnKeys, Map<Integer, RoaringBitmap>> entry :
cache.entrySet()) {
+ Document document = new Document();
+ LuceneColumnKeys key = entry.getKey();
+ for (int i = 0; i < key.getColValues().length; i++) {
+ addField(document, key.getColValues()[i],
indexCols.get(i).getColName(), Field.Store.NO);
+ }
+ Map<Integer, RoaringBitmap> value = entry.getValue();
+ int count = 0;
+ for (Map.Entry<Integer, RoaringBitmap> pageData : value.entrySet()) {
+ RoaringBitmap bitMap = pageData.getValue();
+ int cardinality = bitMap.getCardinality();
+ // Each row is short and pageid is stored in int
+ ByteBuffer byteBuffer = ByteBuffer.allocate(cardinality * 2 + 4);
+ if (!storeBlockletWise) {
+ byteBuffer.putInt(pageData.getKey());
+ } else {
+ byteBuffer.putShort(pageData.getKey().shortValue());
+ }
+ IntIterator intIterator = bitMap.getIntIterator();
+ while (intIterator.hasNext()) {
+ byteBuffer.putShort((short) intIterator.next());
+ }
+ document.add(new StoredField(PAGEID_NAME + count,
byteBuffer.array()));
+ count++;
+ }
+ indexWriter.addDocument(document);
+ }
+ cache.clear();
}
/**
* This is called during closing of writer.So after this call no more
data will be sent to this
* class.
*/
public void finish() throws IOException {
+ flushCache(cache, getIndexColumns(), indexWriter, storeBlockletWise);
// finished a file , close this index writer
if (indexWriter != null) {
indexWriter.close();
}
}
+ public static class LuceneColumnKeys {
+
+ private Object[] colValues;
+
+ public LuceneColumnKeys(int size) {
+ colValues = new Object[size];
+ }
+
+ public Object[] getColValues() {
+ return colValues;
+ }
+
+ @Override public boolean equals(Object o) {
--- End diff --
move override to previous line, besides there are others in this class.
---