Repository: carbondata Updated Branches: refs/heads/datamap f4ab1ff69 -> 15036b1d5 (forced update)
Rebased with metadata Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/15036b1d Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/15036b1d Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/15036b1d Branch: refs/heads/datamap Commit: 15036b1d513378bdc392c5bd0f688446ee3bd17c Parents: 2015a3e Author: Ravindra Pesala <[email protected]> Authored: Thu Jul 13 14:18:02 2017 +0530 Committer: Ravindra Pesala <[email protected]> Committed: Thu Jul 27 09:10:21 2017 +0530 ---------------------------------------------------------------------- .../core/indexstore/UnsafeMemoryDMStore.java | 27 +++---- .../blockletindex/BlockletDataMap.java | 11 +-- .../core/indexstore/row/DataMapRow.java | 4 +- .../core/indexstore/row/DataMapRowImpl.java | 4 + .../core/indexstore/row/UnsafeDataMapRow.java | 40 +++++++++- .../core/memory/UnsafeMemoryManager.java | 16 ++-- .../hadoop/api/CarbonTableInputFormat.java | 81 ++++++++++---------- .../carbondata/spark/rdd/CarbonMergerRDD.scala | 2 +- 8 files changed, 113 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/15036b1d/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java index 8246f99..737586e 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java @@ -19,9 +19,9 @@ package org.apache.carbondata.core.indexstore; import org.apache.carbondata.core.indexstore.row.DataMapRow; import org.apache.carbondata.core.indexstore.row.UnsafeDataMapRow; import org.apache.carbondata.core.indexstore.schema.DataMapSchema; -import org.apache.carbondata.core.memory.MemoryAllocator; -import org.apache.carbondata.core.memory.MemoryAllocatorFactory; import org.apache.carbondata.core.memory.MemoryBlock; +import org.apache.carbondata.core.memory.MemoryException; +import org.apache.carbondata.core.memory.UnsafeMemoryManager; import static org.apache.carbondata.core.memory.CarbonUnsafe.BYTE_ARRAY_OFFSET; import static org.apache.carbondata.core.memory.CarbonUnsafe.unsafe; @@ -39,8 +39,6 @@ public class UnsafeMemoryDMStore { private int runningLength; - private MemoryAllocator memoryAllocator; - private boolean isMemoryFreed; private DataMapSchema[] schema; @@ -49,11 +47,10 @@ public class UnsafeMemoryDMStore { private int rowCount; - public UnsafeMemoryDMStore(DataMapSchema[] schema) { + public UnsafeMemoryDMStore(DataMapSchema[] schema) throws MemoryException { this.schema = schema; - this.memoryAllocator = MemoryAllocatorFactory.INSATANCE.getMemoryAllocator(); this.allocatedSize = capacity; - this.memoryBlock = memoryAllocator.allocate(allocatedSize); + this.memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(allocatedSize); this.pointers = new int[1000]; } @@ -63,13 +60,13 @@ public class UnsafeMemoryDMStore { * * @param rowSize */ - private void ensureSize(int rowSize) { + private void ensureSize(int rowSize) throws MemoryException { if (runningLength + rowSize >= allocatedSize) { MemoryBlock allocate = - MemoryAllocatorFactory.INSATANCE.getMemoryAllocator().allocate(allocatedSize + capacity); + UnsafeMemoryManager.allocateMemoryWithRetry(allocatedSize + capacity); unsafe.copyMemory(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset(), allocate.getBaseObject(), allocate.getBaseOffset(), runningLength); - memoryAllocator.free(memoryBlock); + UnsafeMemoryManager.INSTANCE.freeMemory(memoryBlock); allocatedSize = allocatedSize + capacity; memoryBlock = allocate; } @@ -86,7 +83,7 @@ public class UnsafeMemoryDMStore { * @param indexRow * @return */ - public void addIndexRowToUnsafe(DataMapRow indexRow) { + public void addIndexRowToUnsafe(DataMapRow indexRow) throws MemoryException { // First calculate the required memory to keep the row in unsafe int rowSize = indexRow.getTotalSizeInBytes(); // Check whether allocated memory is sufficient or not. @@ -168,13 +165,13 @@ public class UnsafeMemoryDMStore { return new UnsafeDataMapRow(schema, memoryBlock, pointers[index]); } - public void finishWriting() { + public void finishWriting() throws MemoryException { if (runningLength < allocatedSize) { MemoryBlock allocate = - MemoryAllocatorFactory.INSATANCE.getMemoryAllocator().allocate(runningLength); + UnsafeMemoryManager.allocateMemoryWithRetry(runningLength); unsafe.copyMemory(memoryBlock.getBaseObject(), memoryBlock.getBaseOffset(), allocate.getBaseObject(), allocate.getBaseOffset(), runningLength); - memoryAllocator.free(memoryBlock); + UnsafeMemoryManager.INSTANCE.freeMemory(memoryBlock); memoryBlock = allocate; } // Compact pointers. @@ -187,7 +184,7 @@ public class UnsafeMemoryDMStore { public void freeMemory() { if (!isMemoryFreed) { - memoryAllocator.free(memoryBlock); + UnsafeMemoryManager.INSTANCE.freeMemory(memoryBlock); isMemoryFreed = true; } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/15036b1d/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java index 79aa091..680852d 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java @@ -44,6 +44,7 @@ import org.apache.carbondata.core.indexstore.row.DataMapRow; import org.apache.carbondata.core.indexstore.row.DataMapRowImpl; import org.apache.carbondata.core.indexstore.schema.DataMapSchema; import org.apache.carbondata.core.keygenerator.KeyGenException; +import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.metadata.blocklet.BlockletInfo; import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex; @@ -110,7 +111,7 @@ public class BlockletDataMap implements DataMap, Cacheable { if (unsafeMemoryDMStore != null) { unsafeMemoryDMStore.finishWriting(); } - } catch (IOException e) { + } catch (Exception e) { throw new RuntimeException(e); } } @@ -156,11 +157,11 @@ public class BlockletDataMap implements DataMap, Cacheable { DataOutput dataOutput = new DataOutputStream(stream); blockletInfo.write(dataOutput); serializedData = stream.toByteArray(); - } catch (IOException e) { + row.setByteArray(serializedData, ordinal); + unsafeMemoryDMStore.addIndexRowToUnsafe(row); + } catch (Exception e) { throw new RuntimeException(e); } - row.setByteArray(serializedData, ordinal); - unsafeMemoryDMStore.addIndexRowToUnsafe(row); } } @@ -176,7 +177,7 @@ public class BlockletDataMap implements DataMap, Cacheable { return minRow; } - private void createSchema(SegmentProperties segmentProperties) { + private void createSchema(SegmentProperties segmentProperties) throws MemoryException { List<DataMapSchema> indexSchemas = new ArrayList<>(); // Index key http://git-wip-us.apache.org/repos/asf/carbondata/blob/15036b1d/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java index defe766..631e0ad 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java @@ -62,6 +62,8 @@ public abstract class DataMapRow { public abstract double getDouble(int ordinal); + public abstract int getLengthInBytes(int ordinal); + public int getTotalSizeInBytes() { int len = 0; for (int i = 0; i < schemas.length; i++) { @@ -75,7 +77,7 @@ public abstract class DataMapRow { case FIXED: return schemas[ordinal].getLength(); case VARIABLE: - return getByteArray(ordinal).length + 2; + return getLengthInBytes(ordinal) + 2; case STRUCT: return getRow(ordinal).getTotalSizeInBytes(); default: http://git-wip-us.apache.org/repos/asf/carbondata/blob/15036b1d/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java index adec346..32d15d3 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java @@ -35,6 +35,10 @@ public class DataMapRowImpl extends DataMapRow { return (byte[]) data[ordinal]; } + @Override public int getLengthInBytes(int ordinal) { + return ((byte[]) data[ordinal]).length; + } + @Override public DataMapRow getRow(int ordinal) { return (DataMapRow) data[ordinal]; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/15036b1d/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java index ef78514..c398115 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java @@ -55,6 +55,31 @@ public class UnsafeDataMapRow extends DataMapRow { return data; } + @Override public int getLengthInBytes(int ordinal) { + int length; + int position = getPosition(ordinal); + switch (schemas[ordinal].getSchemaType()) { + case VARIABLE: + length = unsafe.getShort(block.getBaseObject(), block.getBaseOffset() + pointer + position); + break; + default: + length = schemas[ordinal].getLength(); + } + return length; + } + + private int getLengthInBytes(int ordinal, int position) { + int length; + switch (schemas[ordinal].getSchemaType()) { + case VARIABLE: + length = unsafe.getShort(block.getBaseObject(), block.getBaseOffset() + pointer + position); + break; + default: + length = schemas[ordinal].getLength(); + } + return length; + } + @Override public DataMapRow getRow(int ordinal) { DataMapSchema[] childSchemas = ((DataMapSchema.StructDataMapSchema) schemas[ordinal]).getChildSchemas(); @@ -123,10 +148,23 @@ public class UnsafeDataMapRow extends DataMapRow { throw new UnsupportedOperationException("Not supported to set on unsafe row"); } + private int getSizeInBytes(int ordinal, int position) { + switch (schemas[ordinal].getSchemaType()) { + case FIXED: + return schemas[ordinal].getLength(); + case VARIABLE: + return getLengthInBytes(ordinal, position) + 2; + case STRUCT: + return getRow(ordinal).getTotalSizeInBytes(); + default: + throw new UnsupportedOperationException("wrong type"); + } + } + private int getPosition(int ordinal) { int position = 0; for (int i = 0; i < ordinal; i++) { - position += getSizeInBytes(i); + position += getSizeInBytes(i, position); } return position; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/15036b1d/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java index 28e63a9..c491908 100644 --- a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java +++ b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java @@ -101,11 +101,9 @@ public class UnsafeMemoryManager { if (memoryUsed + memoryRequested <= totalMemory) { MemoryBlock allocate = allocator.allocate(memoryRequested); memoryUsed += allocate.size(); - if (LOGGER.isDebugEnabled()) { - set.add(allocate); - LOGGER.error("Memory block (" + allocate + ") is created with size " + allocate.size() + - ". Total memory used " + memoryUsed + "Bytes, left " + getAvailableMemory() + "Bytes"); - } + set.add(allocate); + LOGGER.info("Memory block (" + allocate + ") is created with size " + allocate.size() + + ". Total memory used " + memoryUsed + "Bytes, left " + getAvailableMemory() + "Bytes"); return allocate; } return null; @@ -115,11 +113,9 @@ public class UnsafeMemoryManager { allocator.free(memoryBlock); memoryUsed -= memoryBlock.size(); memoryUsed = memoryUsed < 0 ? 0 : memoryUsed; - if (LOGGER.isDebugEnabled()) { - set.remove(memoryBlock); - LOGGER.error("Memory block (" + memoryBlock + ") released. Total memory used " + memoryUsed + - "Bytes, left " + getAvailableMemory() + "Bytes. Total allocated block: " + set.size()); - } + set.remove(memoryBlock); + LOGGER.info("Memory block (" + memoryBlock + ") released. Total memory used " + memoryUsed + + "Bytes, left " + getAvailableMemory() + "Bytes. Total allocated block: " + set.size()); } private synchronized long getAvailableMemory() { http://git-wip-us.apache.org/repos/asf/carbondata/blob/15036b1d/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java index e73c04a..8938699 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java @@ -38,6 +38,7 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.ColumnarFormatVersion; import org.apache.carbondata.core.metadata.schema.PartitionInfo; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.TableInfo; import org.apache.carbondata.core.mutate.CarbonUpdateUtil; import org.apache.carbondata.core.mutate.SegmentUpdateDetails; import org.apache.carbondata.core.mutate.UpdateVO; @@ -99,59 +100,56 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { private static final String FILTER_PREDICATE = "mapreduce.input.carboninputformat.filter.predicate"; private static final String COLUMN_PROJECTION = "mapreduce.input.carboninputformat.projection"; - private static final String CARBON_TABLE = "mapreduce.input.carboninputformat.table"; + private static final String TABLE_INFO = "mapreduce.input.carboninputformat.tableinfo"; private static final String CARBON_READ_SUPPORT = "mapreduce.input.carboninputformat.readsupport"; + // a cache for carbon table, it will be used in task side + private CarbonTable carbonTable; + /** - * It is optional, if user does not set then it reads from store - * - * @param configuration - * @param carbonTable - * @throws IOException + * Set the `tableInfo` in `configuration` */ - public static void setCarbonTable(Configuration configuration, CarbonTable carbonTable) + public static void setTableInfo(Configuration configuration, TableInfo tableInfo) throws IOException { - if (null != carbonTable) { - configuration.set(CARBON_TABLE, ObjectSerializationUtil.convertObjectToString(carbonTable)); + if (null != tableInfo) { + configuration.set(TABLE_INFO, ObjectSerializationUtil.convertObjectToString(tableInfo)); } } - public static CarbonTable getCarbonTable(Configuration configuration) throws IOException { - String carbonTableStr = configuration.get(CARBON_TABLE); - if (carbonTableStr == null) { - populateCarbonTable(configuration); - // read it from schema file in the store - carbonTableStr = configuration.get(CARBON_TABLE); - return (CarbonTable) ObjectSerializationUtil.convertStringToObject(carbonTableStr); - } - return (CarbonTable) ObjectSerializationUtil.convertStringToObject(carbonTableStr); + /** + * Get TableInfo object from `configuration` + */ + private TableInfo getTableInfo(Configuration configuration) throws IOException { + String tableInfoStr = configuration.get(TABLE_INFO); + return (TableInfo) ObjectSerializationUtil.convertStringToObject(tableInfoStr); } /** - * this method will read the schema from the physical file and populate into CARBON_TABLE - * - * @param configuration - * @throws IOException + * Get the cached CarbonTable or create it by TableInfo in `configuration` */ - private static void populateCarbonTable(Configuration configuration) throws IOException { - String dirs = configuration.get(INPUT_DIR, ""); - String[] inputPaths = StringUtils.split(dirs); - if (inputPaths.length == 0) { - throw new InvalidPathException("No input paths specified in job"); + private CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException { + if (carbonTable == null) { + // carbon table should be created either from deserialized table info (schema saved in + // hive metastore) or by reading schema in HDFS (schema saved in HDFS) + TableInfo tableInfo = getTableInfo(configuration); + CarbonTable carbonTable; + if (tableInfo != null) { + carbonTable = CarbonTable.buildFromTableInfo(tableInfo); + } else { + carbonTable = SchemaReader.readCarbonTableFromStore( + getAbsoluteTableIdentifier(configuration)); + } + this.carbonTable = carbonTable; + return carbonTable; + } else { + return this.carbonTable; } - AbsoluteTableIdentifier absoluteTableIdentifier = - AbsoluteTableIdentifier.fromTablePath(inputPaths[0]); - // read the schema file to get the absoluteTableIdentifier having the correct table id - // persisted in the schema - CarbonTable carbonTable = SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier); - setCarbonTable(configuration, carbonTable); } public static void setTablePath(Configuration configuration, String tablePath) throws IOException { configuration.set(FileInputFormat.INPUT_DIR, tablePath); } - /** * It sets unresolved filter expression. * @@ -213,9 +211,14 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { configuration.set(INPUT_FILES, CarbonUtil.getSegmentString(validFiles)); } - private static AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration) + private AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration) throws IOException { - return getCarbonTable(configuration).getAbsoluteTableIdentifier(); + String dirs = configuration.get(INPUT_DIR, ""); + String[] inputPaths = StringUtils.split(dirs); + if (inputPaths.length == 0) { + throw new InvalidPathException("No input paths specified in job"); + } + return AbsoluteTableIdentifier.fromTablePath(inputPaths[0]); } /** @@ -262,7 +265,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { // process and resolve the expression Expression filter = getFilterPredicates(job.getConfiguration()); - CarbonTable carbonTable = getCarbonTable(job.getConfiguration()); + CarbonTable carbonTable = getOrCreateCarbonTable(job.getConfiguration()); // this will be null in case of corrupt schema file. if (null == carbonTable) { throw new IOException("Missing/Corrupt schema file for table."); @@ -320,7 +323,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { Boolean isIUDTable = false; AbsoluteTableIdentifier absoluteTableIdentifier = - getCarbonTable(job.getConfiguration()).getAbsoluteTableIdentifier(); + getOrCreateCarbonTable(job.getConfiguration()).getAbsoluteTableIdentifier(); SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(absoluteTableIdentifier); @@ -432,7 +435,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { public QueryModel getQueryModel(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException { Configuration configuration = taskAttemptContext.getConfiguration(); - CarbonTable carbonTable = getCarbonTable(configuration); + CarbonTable carbonTable = getOrCreateCarbonTable(configuration); // getting the table absoluteTableIdentifier from the carbonTable // to avoid unnecessary deserialization AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/15036b1d/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala index 6bc7564..2e737ab 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala @@ -265,7 +265,7 @@ class CarbonMergerRDD[K, V]( val jobConf: JobConf = new JobConf(new Configuration) val job: Job = new Job(jobConf) val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job) - CarbonInputFormat.setTableInfo(job.getConfiguration, + CarbonTableInputFormat.setTableInfo(job.getConfiguration, carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getTableInfo) var updateDetails: UpdateVO = null // initialise query_id for job
