[CARBONDATA-2723][DataMap] Fix bugs in recreate datamap on table While we drop datamap/table, the executor side cache for datamap is stale. So if we recreate the datamap with different index columns, when we are doing data loading, the cache should be cleaned, otherwise the DataMapWriterListener will not take effect for the new datamap.
This closes #2486 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/98c75819 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/98c75819 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/98c75819 Branch: refs/heads/carbonstore Commit: 98c758190ed27eaf4f874a9445900960c4523251 Parents: bd02656 Author: xuchuanyin <[email protected]> Authored: Wed Jul 11 12:23:18 2018 +0800 Committer: ravipesala <[email protected]> Committed: Fri Jul 13 15:44:20 2018 +0530 ---------------------------------------------------------------------- .../carbondata/core/datamap/DataMapMeta.java | 13 ++++++---- .../core/datamap/DataMapStoreManager.java | 2 ++ .../table/DiskBasedDMSchemaStorageProvider.java | 25 ++++++++++---------- .../bloom/AbstractBloomDataMapWriter.java | 4 ---- .../datamap/bloom/BloomDataMapBuilder.java | 1 - .../datamap/DataMapWriterListener.java | 3 +++ .../store/writer/AbstractFactDataWriter.java | 1 + 7 files changed, 27 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/98c75819/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java index adf85d8..93a8012 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapMeta.java @@ -26,6 +26,7 @@ import org.apache.carbondata.core.scan.filter.intf.ExpressionType; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.Transformer; +import org.apache.commons.lang3.StringUtils; /** * Metadata of the datamap, set by DataMap developer @@ -71,9 +72,13 @@ public class DataMapMeta { return optimizedOperation; } - @Override - public String toString() { - return "DataMapMeta{" + "dataMapName='" + dataMapName + '\'' + ", indexedColumns=" - + indexedColumns + ", optimizedOperation=" + optimizedOperation + '}'; + @Override public String toString() { + return new StringBuilder("DataMapMeta{") + .append("dataMapName='").append(dataMapName).append('\'') + .append(", indexedColumns=[") + .append(StringUtils.join(getIndexedColumnNames(), ", ")).append("]\'") + .append(", optimizedOperation=").append(optimizedOperation) + .append('}') + .toString(); } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/98c75819/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java index 574b4c6..9a7d1c1 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java @@ -431,6 +431,7 @@ public final class DataMapStoreManager { } } } + allDataMaps.remove(tableUniqName); } /** @@ -460,6 +461,7 @@ public final class DataMapStoreManager { } i++; } + allDataMaps.put(tableUniqueName, tableIndices); } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/98c75819/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java index cf4f6b9..f90960b 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java @@ -27,6 +27,7 @@ import java.io.OutputStreamWriter; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Set; @@ -84,10 +85,10 @@ public class DiskBasedDMSchemaStorageProvider implements DataMapSchemaStoragePro if (null != brWriter) { brWriter.flush(); } - checkAndReloadDataMapSchemas(true); dataMapSchemas.add(dataMapSchema); - touchMDTFile(); CarbonUtil.closeStreams(dataOutputStream, brWriter); + checkAndReloadDataMapSchemas(true); + touchMDTFile(); } } @@ -159,19 +160,16 @@ public class DiskBasedDMSchemaStorageProvider implements DataMapSchemaStoragePro if (!FileFactory.isFileExist(schemaPath, FileFactory.getFileType(schemaPath))) { throw new IOException("DataMap with name " + dataMapName + " does not exists in storage"); } - DataMapSchema dataMapSchemaToRemove = null; - for (DataMapSchema dataMapSchema : dataMapSchemas) { - if (dataMapSchema.getDataMapName().equalsIgnoreCase(dataMapName)) { - dataMapSchemaToRemove = dataMapSchema; + Iterator<DataMapSchema> iterator = dataMapSchemas.iterator(); + while (iterator.hasNext()) { + DataMapSchema schema = iterator.next(); + if (schema.getDataMapName().equalsIgnoreCase(dataMapName)) { + iterator.remove(); } } - if (dataMapSchemaToRemove != null) { - dataMapSchemas.remove(dataMapSchemaToRemove); - } + touchMDTFile(); if (!FileFactory.deleteFile(schemaPath, FileFactory.getFileType(schemaPath))) { throw new IOException("DataMap with name " + dataMapName + " cannot be deleted"); - } else { - touchMDTFile(); } } @@ -180,13 +178,13 @@ public class DiskBasedDMSchemaStorageProvider implements DataMapSchemaStoragePro long lastModifiedTime = FileFactory.getCarbonFile(mdtFilePath).getLastModifiedTime(); if (this.lastModifiedTime != lastModifiedTime) { dataMapSchemas = retrieveAllSchemasInternal(); - this.lastModifiedTime = lastModifiedTime; + touchMDTFile(); } } else { + dataMapSchemas = retrieveAllSchemasInternal(); if (touchFile) { touchMDTFile(); } - dataMapSchemas = retrieveAllSchemasInternal(); } } @@ -205,6 +203,7 @@ public class DiskBasedDMSchemaStorageProvider implements DataMapSchemaStoragePro } long lastModifiedTime = System.currentTimeMillis(); FileFactory.getCarbonFile(mdtFilePath).setLastModifiedTime(lastModifiedTime); + this.lastModifiedTime = lastModifiedTime; } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/98c75819/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/AbstractBloomDataMapWriter.java ---------------------------------------------------------------------- diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/AbstractBloomDataMapWriter.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/AbstractBloomDataMapWriter.java index c160206..fcecc01 100644 --- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/AbstractBloomDataMapWriter.java +++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/AbstractBloomDataMapWriter.java @@ -116,7 +116,6 @@ public abstract class AbstractBloomDataMapWriter extends DataMapWriter { protected void resetBloomFilters() { indexBloomFilters.clear(); - List<CarbonColumn> indexColumns = getIndexColumns(); int[] stats = calculateBloomStats(); for (int i = 0; i < indexColumns.size(); i++) { indexBloomFilters @@ -225,7 +224,6 @@ public abstract class AbstractBloomDataMapWriter extends DataMapWriter { throw new IOException("Failed to create directory " + dataMapPath); } } - List<CarbonColumn> indexColumns = getIndexColumns(); for (int indexColId = 0; indexColId < indexColumns.size(); indexColId++) { String dmFile = BloomCoarseGrainDataMap.getBloomIndexFile(dataMapPath, indexColumns.get(indexColId).getColName()); @@ -245,7 +243,6 @@ public abstract class AbstractBloomDataMapWriter extends DataMapWriter { } protected void writeBloomDataMapFile() { - List<CarbonColumn> indexColumns = getIndexColumns(); try { for (int indexColId = 0; indexColId < indexColumns.size(); indexColId++) { CarbonBloomFilter bloomFilter = indexBloomFilters.get(indexColId); @@ -274,7 +271,6 @@ public abstract class AbstractBloomDataMapWriter extends DataMapWriter { } protected void releaseResouce() { - List<CarbonColumn> indexColumns = getIndexColumns(); for (int indexColId = 0; indexColId < indexColumns.size(); indexColId++) { CarbonUtil.closeStreams( currentDataOutStreams.get(indexColId)); http://git-wip-us.apache.org/repos/asf/carbondata/blob/98c75819/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapBuilder.java ---------------------------------------------------------------------- diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapBuilder.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapBuilder.java index f444ab5..7ba8c42 100644 --- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapBuilder.java +++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomDataMapBuilder.java @@ -52,7 +52,6 @@ public class BloomDataMapBuilder extends AbstractBloomDataMapWriter implements D currentBlockletId = blockletId; } // for each indexed column, add the data to bloom filter - List<CarbonColumn> indexColumns = getIndexColumns(); for (int i = 0; i < indexColumns.size(); i++) { Object data = values[i]; addValue2BloomIndex(i, data); http://git-wip-us.apache.org/repos/asf/carbondata/blob/98c75819/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java index 4207253..55a251d 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java +++ b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java @@ -61,6 +61,9 @@ public class DataMapWriterListener { */ public void registerAllWriter(CarbonTable carbonTable, String segmentId, String taskNo, SegmentProperties segmentProperties) { + // clear cache in executor side + DataMapStoreManager.getInstance() + .clearDataMaps(carbonTable.getCarbonTableIdentifier().getTableUniqueName()); List<TableDataMap> tableIndices; try { tableIndices = DataMapStoreManager.getInstance().getAllDataMap(carbonTable); http://git-wip-us.apache.org/repos/asf/carbondata/blob/98c75819/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java index 3082b91..3e71e45 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java @@ -422,6 +422,7 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter { for (int i = 0; i < executorServiceSubmitList.size(); i++) { executorServiceSubmitList.get(i).get(); } + listener = null; } catch (InterruptedException | ExecutionException | IOException e) { throw new CarbonDataWriterException(e); }
