http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/integration/spark-common-test/src/test/java/org/carbondata/integration/spark/testsuite/validation/FileFooterValidator.java ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/java/org/carbondata/integration/spark/testsuite/validation/FileFooterValidator.java b/integration/spark-common-test/src/test/java/org/carbondata/integration/spark/testsuite/validation/FileFooterValidator.java deleted file mode 100644 index 5837f0c..0000000 --- a/integration/spark-common-test/src/test/java/org/carbondata/integration/spark/testsuite/validation/FileFooterValidator.java +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.carbondata.integration.spark.testsuite.validation; - -import org.apache.spark.sql.common.util.CarbonHiveContext; -import org.apache.carbondata.core.carbon.CarbonTableIdentifier; -import org.apache.carbondata.core.util.path.CarbonStorePath; -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.datastorage.store.FileHolder; -import org.apache.carbondata.core.datastore.filesystem.CarbonFile; -import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; -import org.apache.carbondata.core.datastore.impl.FileFactory; -import org.apache.carbondata.core.reader.CarbonFooterReader; -import org.apache.carbondata.core.util.CarbonProperties; -import org.apache.carbondata.format.BlockletIndex; -import org.apache.carbondata.format.BlockletInfo; -import org.apache.carbondata.format.DataChunk; -import org.apache.carbondata.format.Encoding; -import org.apache.carbondata.format.FileFooter; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.Test; - -import static org.junit.Assert.assertTrue; - -public class FileFooterValidator { - - private static FileFooter fileFooter; - - private static boolean setUpIsDone; - - @Before public void setUp() throws Exception { - - if (setUpIsDone) { - return; - } - CarbonHiveContext.sql( - "CREATE CUBE validatefooter DIMENSIONS (empno Integer, empname String," - + " designation String," - + " doj Timestamp, workgroupcategory Integer, workgroupcategoryname String, " - + "deptno Integer, deptname String, projectcode Integer, projectjoindate Timestamp," - + " projectenddate Timestamp) MEASURES (attendance Integer,utilization Integer," - + "salary Integer) OPTIONS (PARTITIONER [PARTITION_COUNT=1])"); - CarbonHiveContext.sql( - "LOAD DATA fact from './src/test/resources/data.csv' INTO CUBE validatefooter " - + "PARTITIONDATA(DELIMITER ',', QUOTECHAR '\"')"); - String storePath = - CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION); - CarbonTableIdentifier tableIdentifier = - new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "validatefooter", "1"); - String segmentPath = CarbonStorePath.getCarbonTablePath(storePath, tableIdentifier) - .getCarbonDataDirectoryPath("0", "0"); - CarbonFile carbonFile = - FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath)); - CarbonFile[] list = carbonFile.listFiles(new CarbonFileFilter() { - @Override public boolean accept(CarbonFile file) { - if (file.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT)) { - return true; - } - return false; - } - }); - - for (CarbonFile file : list) { - String fileLocation = file.getAbsolutePath(); - CarbonFile factFile = - FileFactory.getCarbonFile(fileLocation, FileFactory.getFileType(fileLocation)); - long offset = factFile.getSize() - CarbonCommonConstants.LONG_SIZE_IN_BYTE; - FileHolder fileHolder = FileFactory.getFileHolder(FileFactory.getFileType(fileLocation)); - offset = fileHolder.readLong(fileLocation, offset); - CarbonFooterReader metaDataReader = new CarbonFooterReader(fileLocation, offset); - fileFooter = metaDataReader.readFooter(); - } - setUpIsDone = true; - } - - @AfterClass public static void tearDownAfterClass() { - CarbonHiveContext.sql("drop CUBE validatefooter"); - } - - @Test public void testFileFooterExist() { - assertTrue(fileFooter != null); - } - - @Test public void testFileFooterVersion() { - assertTrue(fileFooter.getVersion() >= 0); - } - - @Test public void testFileFooterNumRows() { - assertTrue(fileFooter.getNum_rows() > 0); - } - - @Test public void testFileFooterTableColumns() { - assertTrue(fileFooter.getTable_columns() != null && fileFooter.getTable_columns().size() > 0); - } - - @Test public void testFileFooterSegmentInfo() { - assertTrue( - fileFooter.getSegment_info() != null && fileFooter.getSegment_info().getNum_cols() > 0 - && fileFooter.getSegment_info().getColumn_cardinalities().size() > 0); - } - - @Test public void testFileFooterBlockletIndex() { - assertTrue(fileFooter.getBlocklet_index_list() != null - && fileFooter.getBlocklet_index_list().size() > 0); - for (BlockletIndex blockletIndex : fileFooter.getBlocklet_index_list()) { - assertTrue(blockletIndex.getMin_max_index().getMin_values() != null - && blockletIndex.getMin_max_index().getMin_values().size() > 0 - && blockletIndex.getMin_max_index().getMax_values() != null - && blockletIndex.getMin_max_index().getMax_values().size() > 0 - && blockletIndex.getMin_max_index().getMin_values().size() == blockletIndex - .getMin_max_index().getMax_values().size()); - assertTrue(blockletIndex.getB_tree_index().getStart_key() != null - && blockletIndex.getB_tree_index().getEnd_key() != null); - } - } - - @Test public void testFileFooterBlockletInfo() { - assertTrue(fileFooter.getBlocklet_info_list() != null - && fileFooter.getBlocklet_info_list().size() > 0); - for (BlockletInfo blockletInfo : fileFooter.getBlocklet_info_list()) { - assertTrue(blockletInfo.getNum_rows() > 0 && blockletInfo.getColumn_data_chunks() != null - && blockletInfo.getColumn_data_chunks().size() > 0); - for (DataChunk columnDataChunk : blockletInfo.getColumn_data_chunks()) { - testColumnDataChunk(columnDataChunk); - } - } - } - - private void testColumnDataChunk(DataChunk columnDatachunk) { - assertTrue(columnDatachunk.getEncoders() != null && columnDatachunk.getChunk_meta() != null - && columnDatachunk.getChunk_meta().getCompression_codec() != null); - // For Measure - if (columnDatachunk.getEncoders().contains(Encoding.DELTA)) { - assertTrue( - columnDatachunk.getPresence() != null && columnDatachunk.getEncoder_meta() != null); - } else { - assertTrue(columnDatachunk.getSort_state() != null); - } - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala new file mode 100644 index 0000000..b0e4833 --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.datamap + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{DataFrame, SaveMode} +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.datamap.dev.{DataMap, DataMapFactory, DataMapWriter} +import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager} +import org.apache.carbondata.core.datastore.page.ColumnPage +import org.apache.carbondata.core.events.ChangeEvent +import org.apache.carbondata.core.indexstore.schema.FilterType +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier +import org.apache.carbondata.core.metadata.datatype.DataType +import org.apache.carbondata.core.util.CarbonProperties + +class C2DataMapFactory() extends DataMapFactory { + + override def init(identifier: AbsoluteTableIdentifier, + dataMapName: String): Unit = {} + + override def fireEvent(event: ChangeEvent[_]): Unit = ??? + + override def clear(segmentId: String): Unit = ??? + + override def clear(): Unit = ??? + + override def getDataMap(distributable: DataMapDistributable): DataMap = ??? + + override def getDataMaps(segmentId: String): util.List[DataMap] = ??? + + override def createWriter(segmentId: String): DataMapWriter = DataMapWriterSuite.dataMapWriterC2Mock + + override def getMeta: DataMapMeta = new DataMapMeta(List("c2").asJava, FilterType.EQUALTO) +} + +class DataMapWriterSuite extends QueryTest with BeforeAndAfterAll { + + def buildTestData(numRows: Int): DataFrame = { + import sqlContext.implicits._ + sqlContext.sparkContext.parallelize(1 to numRows) + .map(x => ("a", "b", x)) + .toDF("c1", "c2", "c3") + } + + def dropTable(): Unit = { + sql("DROP TABLE IF EXISTS carbon1") + sql("DROP TABLE IF EXISTS carbon2") + } + + override def beforeAll { + dropTable() + } + + test("test write datamap 2 pages") { + // register datamap writer + DataMapStoreManager.getInstance().createAndRegisterDataMap( + AbsoluteTableIdentifier.from(storeLocation, "default", "carbon1"), + classOf[C2DataMapFactory], + "test") + + val df = buildTestData(33000) + + // save dataframe to carbon file + df.write + .format("carbondata") + .option("tableName", "carbon1") + .mode(SaveMode.Overwrite) + .save() + + assert(DataMapWriterSuite.callbackSeq.head.contains("block start")) + assert(DataMapWriterSuite.callbackSeq.last.contains("block end")) + assert(DataMapWriterSuite.callbackSeq.slice(1, DataMapWriterSuite.callbackSeq.length - 1) == Seq( + "blocklet start 0", + "add page data: blocklet 0, page 0", + "add page data: blocklet 0, page 1", + "blocklet end: 0" + )) + DataMapWriterSuite.callbackSeq = Seq() + } + + test("test write datamap 2 blocklet") { + // register datamap writer + DataMapStoreManager.getInstance().createAndRegisterDataMap( + AbsoluteTableIdentifier.from(storeLocation, "default", "carbon2"), + classOf[C2DataMapFactory], + "test") + + CarbonProperties.getInstance() + .addProperty("carbon.blockletgroup.size.in.mb", "1") + + val df = buildTestData(300000) + + // save dataframe to carbon file + df.write + .format("carbondata") + .option("tableName", "carbon2") + .mode(SaveMode.Overwrite) + .save() + + assert(DataMapWriterSuite.callbackSeq.head.contains("block start")) + assert(DataMapWriterSuite.callbackSeq.last.contains("block end")) + assert(DataMapWriterSuite.callbackSeq.slice(1, DataMapWriterSuite.callbackSeq.length - 1) == Seq( + "blocklet start 0", + "add page data: blocklet 0, page 0", + "add page data: blocklet 0, page 1", + "add page data: blocklet 0, page 2", + "add page data: blocklet 0, page 3", + "add page data: blocklet 0, page 4", + "add page data: blocklet 0, page 5", + "add page data: blocklet 0, page 6", + "add page data: blocklet 0, page 7", + "blocklet end: 0", + "blocklet start 1", + "add page data: blocklet 1, page 0", + "add page data: blocklet 1, page 1", + "blocklet end: 1" + )) + DataMapWriterSuite.callbackSeq = Seq() + } + + override def afterAll { + dropTable() + } +} + +object DataMapWriterSuite { + var callbackSeq: Seq[String] = Seq[String]() + + val dataMapWriterC2Mock = new DataMapWriter { + + override def onPageAdded( + blockletId: Int, + pageId: Int, + pages: Array[ColumnPage]): Unit = { + assert(pages.length == 1) + assert(pages(0).getDataType == DataType.BYTE_ARRAY) + val bytes: Array[Byte] = pages(0).getByteArrayPage()(0) + assert(bytes.sameElements(Seq(0, 1, 'b'.toByte))) + callbackSeq :+= s"add page data: blocklet $blockletId, page $pageId" + } + + override def onBlockletEnd(blockletId: Int): Unit = { + callbackSeq :+= s"blocklet end: $blockletId" + } + + override def onBlockEnd(blockId: String): Unit = { + callbackSeq :+= s"block end $blockId" + } + + override def onBlockletStart(blockletId: Int): Unit = { + callbackSeq :+= s"blocklet start $blockletId" + } + + override def onBlockStart(blockId: String): Unit = { + callbackSeq :+= s"block start $blockId" + } + + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala index e0829ed..a6a8835 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala @@ -256,10 +256,7 @@ object DataManagementFunc { compactionModel.compactionType ) - val future: Future[Void] = executor - .submit(new CompactionCallable(compactionCallableModel - ) - ) + val future: Future[Void] = executor.submit(new CompactionCallable(compactionCallableModel)) futureList.add(future) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala index a32146a..90f57a9 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala @@ -31,9 +31,9 @@ import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datamap.DataMapStoreManager import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.fileoperations.FileWriteOperation -import org.apache.carbondata.core.indexstore.DataMapStoreManager import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier} import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl import org.apache.carbondata.core.metadata.schema http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala index 4620db0..1837c04 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.{RuntimeConfig, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree -import org.apache.carbondata.core.indexstore.DataMapStoreManager +import org.apache.carbondata.core.datamap.DataMapStoreManager import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier} import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl import org.apache.carbondata.core.metadata.schema.table.CarbonTable http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/integration/spark2/src/test/scala/org/apache/spark/carbondata/iud/DeleteCarbonTableSubqueryTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/iud/DeleteCarbonTableSubqueryTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/iud/DeleteCarbonTableSubqueryTestCase.scala index e92d06d..697b727 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/iud/DeleteCarbonTableSubqueryTestCase.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/iud/DeleteCarbonTableSubqueryTestCase.scala @@ -43,6 +43,7 @@ class DeleteCarbonTableSubqueryTestCase extends Spark2QueryTest with BeforeAndAf sql("""select c1 from iud_db_sub.dest"""), Seq(Row("c"), Row("d"), Row("e")) ) + sql("drop table if exists iud_db_sub.dest") } test("delete data from carbon table[where IN (sub query with where clause) ]") { @@ -54,10 +55,12 @@ class DeleteCarbonTableSubqueryTestCase extends Spark2QueryTest with BeforeAndAf sql("""select c1 from iud_db_sub.dest"""), Seq(Row("a"), Row("c"), Row("d"), Row("e")) ) + sql("drop table if exists iud_db_sub.dest") } override def afterAll { - sql("use default") + sql("drop table if exists iud_db_sub.source2") sql("drop database if exists iud_db_sub cascade") + sql("use default") } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/core/datastore/GenericDataType.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/core/datastore/GenericDataType.java b/processing/src/main/java/org/apache/carbondata/core/datastore/GenericDataType.java deleted file mode 100644 index 12fe27b..0000000 --- a/processing/src/main/java/org/apache/carbondata/core/datastore/GenericDataType.java +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.core.datastore; - -import java.io.DataOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; - -import org.apache.carbondata.core.devapi.DictionaryGenerationException; -import org.apache.carbondata.core.keygenerator.KeyGenException; -import org.apache.carbondata.core.keygenerator.KeyGenerator; - -/** - * Generic DataType interface which will be used while data loading for complex types like Array & - * Struct - */ -public interface GenericDataType<T> { - - /** - * @return name of the column - */ - String getName(); - - /** - * @return - columns parent name - */ - String getParentname(); - - /** - * @param children - To add children dimension for parent complex type - */ - void addChildren(GenericDataType children); - - /** - * @param primitiveChild - Returns all primitive type columns in complex type - */ - void getAllPrimitiveChildren(List<GenericDataType> primitiveChild); - - /** - * writes to byte stream - * @param dataOutputStream - * @throws IOException - */ - void writeByteArray(T input, DataOutputStream dataOutputStream) - throws IOException, DictionaryGenerationException; - - /** - * @return surrogateIndex for primitive column in complex type - */ - int getSurrogateIndex(); - - /** - * @param surrIndex - surrogate index of primitive column in complex type - */ - void setSurrogateIndex(int surrIndex); - - /** - * converts integer surrogate to bit packed surrogate value - * @param byteArrayInput - * @param dataOutputStream - * @param generator - * @throws IOException - * @throws KeyGenException - */ - void parseAndBitPack(ByteBuffer byteArrayInput, DataOutputStream dataOutputStream, - KeyGenerator[] generator) throws IOException, KeyGenException; - - /** - * @return columns count of each complex type - */ - int getColsCount(); - - /** - * @return column uuid string - */ - String getColumnId(); - - /** - * set array index to be referred while creating metadata column - * @param outputArrayIndex - */ - void setOutputArrayIndex(int outputArrayIndex); - - /** - * @return array index count of metadata column - */ - int getMaxOutputArrayIndex(); - - /** - * Split byte array into complex metadata column and primitive column - * @param columnsArray - * @param inputArray - */ - void getColumnarDataForComplexType(List<ArrayList<byte[]>> columnsArray, ByteBuffer inputArray); - - /** - * @return current read row count - */ - int getDataCounter(); - - /** - * fill agg key block including complex types - * @param aggKeyBlockWithComplex - * @param aggKeyBlock - */ - void fillAggKeyBlock(List<Boolean> aggKeyBlockWithComplex, boolean[] aggKeyBlock); - - /** - * fill block key size including complex types - * @param blockKeySizeWithComplex - * @param primitiveBlockKeySize - */ - void fillBlockKeySize(List<Integer> blockKeySizeWithComplex, int[] primitiveBlockKeySize); - - /** - * fill cardinality value including complex types - * @param dimCardWithComplex - * @param maxSurrogateKeyArray - */ - void fillCardinalityAfterDataLoad(List<Integer> dimCardWithComplex, int[] maxSurrogateKeyArray); - - /** - * Fill the cardinality of the primitive datatypes - * @param dimCardWithComplex - */ - void fillCardinality(List<Integer> dimCardWithComplex); - -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java new file mode 100644 index 0000000..4b0113c --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.processing.datamap; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datamap.DataMapMeta; +import org.apache.carbondata.core.datamap.DataMapStoreManager; +import org.apache.carbondata.core.datamap.TableDataMap; +import org.apache.carbondata.core.datamap.dev.DataMapFactory; +import org.apache.carbondata.core.datamap.dev.DataMapWriter; +import org.apache.carbondata.core.datastore.page.ColumnPage; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.processing.store.TablePage; + +/** + * It is for writing DataMap for one table + */ +public class DataMapWriterListener { + + private static final LogService LOG = LogServiceFactory.getLogService( + DataMapWriterListener.class.getCanonicalName()); + + // list indexed column name -> list of data map writer + private Map<List<String>, List<DataMapWriter>> registry = new ConcurrentHashMap<>(); + + /** + * register all datamap writer for specified table and segment + */ + public void registerAllWriter(AbsoluteTableIdentifier identifier, String segmentId) { + List<TableDataMap> tableDataMaps = DataMapStoreManager.getInstance().getAllDataMap(identifier); + if (tableDataMaps != null) { + for (TableDataMap tableDataMap : tableDataMaps) { + DataMapFactory factory = tableDataMap.getDataMapFactory(); + register(factory, segmentId); + } + } + } + + /** + * Register a DataMapWriter + */ + private void register(DataMapFactory factory, String segmentId) { + assert (factory != null); + assert (segmentId != null); + DataMapMeta meta = factory.getMeta(); + if (meta == null) { + // if data map does not have meta, no need to register + return; + } + List<String> columns = factory.getMeta().getIndexedColumns(); + List<DataMapWriter> writers = registry.get(columns); + DataMapWriter writer = factory.createWriter(segmentId); + if (writers != null) { + writers.add(writer); + } else { + writers = new ArrayList<>(); + writers.add(writer); + registry.put(columns, writers); + } + LOG.info("DataMapWriter " + writer + " added"); + } + + public void onBlockStart(String blockId) { + for (List<DataMapWriter> writers : registry.values()) { + for (DataMapWriter writer : writers) { + writer.onBlockStart(blockId); + } + } + } + + public void onBlockEnd(String blockId) { + for (List<DataMapWriter> writers : registry.values()) { + for (DataMapWriter writer : writers) { + writer.onBlockEnd(blockId); + } + } + } + + public void onBlockletStart(int blockletId) { + for (List<DataMapWriter> writers : registry.values()) { + for (DataMapWriter writer : writers) { + writer.onBlockletStart(blockletId); + } + } + } + + public void onBlockletEnd(int blockletId) { + for (List<DataMapWriter> writers : registry.values()) { + for (DataMapWriter writer : writers) { + writer.onBlockletEnd(blockletId); + } + } + } + + /** + * Pick corresponding column pages and add to all registered datamap + * + * @param pageId sequence number of page, start from 0 + * @param tablePage page data + */ + public void onPageAdded(int blockletId, int pageId, TablePage tablePage) { + Set<Map.Entry<List<String>, List<DataMapWriter>>> entries = registry.entrySet(); + for (Map.Entry<List<String>, List<DataMapWriter>> entry : entries) { + List<String> indexedColumns = entry.getKey(); + ColumnPage[] pages = new ColumnPage[indexedColumns.size()]; + for (int i = 0; i < indexedColumns.size(); i++) { + pages[i] = tablePage.getColumnPage(indexedColumns.get(i)); + } + List<DataMapWriter> writers = entry.getValue(); + for (DataMapWriter writer : writers) { + writer.onPageAdded(blockletId, pageId, pages); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java index f5fdd4d..02ceb06 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java +++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java @@ -23,7 +23,6 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; -import org.apache.carbondata.core.datastore.GenericDataType; import org.apache.carbondata.core.devapi.DictionaryGenerationException; import org.apache.carbondata.core.keygenerator.KeyGenException; import org.apache.carbondata.core.keygenerator.KeyGenerator; http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java new file mode 100644 index 0000000..6b54d2d --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/GenericDataType.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.processing.datatypes; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import org.apache.carbondata.core.devapi.DictionaryGenerationException; +import org.apache.carbondata.core.keygenerator.KeyGenException; +import org.apache.carbondata.core.keygenerator.KeyGenerator; + +/** + * Generic DataType interface which will be used while data loading for complex types like Array & + * Struct + */ +public interface GenericDataType<T> { + + /** + * @return name of the column + */ + String getName(); + + /** + * @return - columns parent name + */ + String getParentname(); + + /** + * @param children - To add children dimension for parent complex type + */ + void addChildren(GenericDataType children); + + /** + * @param primitiveChild - Returns all primitive type columns in complex type + */ + void getAllPrimitiveChildren(List<GenericDataType> primitiveChild); + + /** + * writes to byte stream + * @param dataOutputStream + * @throws IOException + */ + void writeByteArray(T input, DataOutputStream dataOutputStream) + throws IOException, DictionaryGenerationException; + + /** + * @return surrogateIndex for primitive column in complex type + */ + int getSurrogateIndex(); + + /** + * @param surrIndex - surrogate index of primitive column in complex type + */ + void setSurrogateIndex(int surrIndex); + + /** + * converts integer surrogate to bit packed surrogate value + * @param byteArrayInput + * @param dataOutputStream + * @param generator + * @throws IOException + * @throws KeyGenException + */ + void parseAndBitPack(ByteBuffer byteArrayInput, DataOutputStream dataOutputStream, + KeyGenerator[] generator) throws IOException, KeyGenException; + + /** + * @return columns count of each complex type + */ + int getColsCount(); + + /** + * @return column uuid string + */ + String getColumnId(); + + /** + * set array index to be referred while creating metadata column + * @param outputArrayIndex + */ + void setOutputArrayIndex(int outputArrayIndex); + + /** + * @return array index count of metadata column + */ + int getMaxOutputArrayIndex(); + + /** + * Split byte array into complex metadata column and primitive column + * @param columnsArray + * @param inputArray + */ + void getColumnarDataForComplexType(List<ArrayList<byte[]>> columnsArray, ByteBuffer inputArray); + + /** + * @return current read row count + */ + int getDataCounter(); + + /** + * fill agg key block including complex types + * @param aggKeyBlockWithComplex + * @param aggKeyBlock + */ + void fillAggKeyBlock(List<Boolean> aggKeyBlockWithComplex, boolean[] aggKeyBlock); + + /** + * fill block key size including complex types + * @param blockKeySizeWithComplex + * @param primitiveBlockKeySize + */ + void fillBlockKeySize(List<Integer> blockKeySizeWithComplex, int[] primitiveBlockKeySize); + + /** + * fill cardinality value including complex types + * @param dimCardWithComplex + * @param maxSurrogateKeyArray + */ + void fillCardinalityAfterDataLoad(List<Integer> dimCardWithComplex, int[] maxSurrogateKeyArray); + + /** + * Fill the cardinality of the primitive datatypes + * @param dimCardWithComplex + */ + void fillCardinality(List<Integer> dimCardWithComplex); + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java index a24a324..e7e48e9 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java +++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java @@ -28,7 +28,6 @@ import org.apache.carbondata.core.cache.Cache; import org.apache.carbondata.core.cache.dictionary.Dictionary; import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier; import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.datastore.GenericDataType; import org.apache.carbondata.core.devapi.BiDictionary; import org.apache.carbondata.core.devapi.DictionaryGenerationException; import org.apache.carbondata.core.dictionary.client.DictionaryClient; http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java index 94ee9f6..a61144e 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java +++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/StructDataType.java @@ -23,7 +23,6 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; -import org.apache.carbondata.core.datastore.GenericDataType; import org.apache.carbondata.core.devapi.DictionaryGenerationException; import org.apache.carbondata.core.keygenerator.KeyGenException; import org.apache.carbondata.core.keygenerator.KeyGenerator; http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/ComplexFieldConverterImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/ComplexFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/ComplexFieldConverterImpl.java index d5730a2..8feea6a 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/ComplexFieldConverterImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/ComplexFieldConverterImpl.java @@ -21,8 +21,8 @@ import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.util.List; -import org.apache.carbondata.core.datastore.GenericDataType; import org.apache.carbondata.core.datastore.row.CarbonRow; +import org.apache.carbondata.processing.datatypes.GenericDataType; import org.apache.carbondata.processing.newflow.converter.BadRecordLogHolder; import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException; http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java index d30582b..e9b0a78 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java @@ -23,13 +23,13 @@ import java.util.Map; import org.apache.carbondata.core.cache.Cache; import org.apache.carbondata.core.cache.dictionary.Dictionary; import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier; -import org.apache.carbondata.core.datastore.GenericDataType; import org.apache.carbondata.core.dictionary.client.DictionaryClient; import org.apache.carbondata.core.metadata.CarbonTableIdentifier; import org.apache.carbondata.core.metadata.encoder.Encoding; import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.processing.datatypes.ArrayDataType; +import org.apache.carbondata.processing.datatypes.GenericDataType; import org.apache.carbondata.processing.datatypes.PrimitiveDataType; import org.apache.carbondata.processing.datatypes.StructDataType; import org.apache.carbondata.processing.newflow.DataField; http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java index 9c48af7..a716340 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java @@ -35,10 +35,8 @@ import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants; -import org.apache.carbondata.core.datastore.GenericDataType; import org.apache.carbondata.core.datastore.columnar.ColumnGroupModel; import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException; -import org.apache.carbondata.core.datastore.page.EncodedTablePage; import org.apache.carbondata.core.datastore.row.CarbonRow; import org.apache.carbondata.core.keygenerator.KeyGenException; import org.apache.carbondata.core.keygenerator.columnar.ColumnarSplitter; @@ -51,6 +49,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.processing.datatypes.GenericDataType; import org.apache.carbondata.processing.newflow.sort.SortScopeOptions; import org.apache.carbondata.processing.store.file.FileManager; import org.apache.carbondata.processing.store.file.IFileManagerComposite; @@ -74,6 +73,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { * data writer */ private CarbonFactDataWriter dataWriter; + /** * File manager */ @@ -87,11 +87,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { * blocklet size (for V1 and V2) or page size (for V3). A Producer thread will start to process * once this size of input is reached */ - private int blockletSize; - /** - * keyGenerator - */ - private ColumnarSplitter columnarSplitter; + private int pageSize; /** * keyBlockHolder */ @@ -120,7 +116,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { /** * a private class that will hold the data for blocklets */ - private BlockletDataHolder blockletDataHolder; + private TablePageList tablePageList; /** * number of cores configured */ @@ -146,8 +142,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { */ private ColumnarFormatVersion version; - private SortScopeOptions.SortScope sortScope; - /** * CarbonFactDataHandler constructor */ @@ -202,11 +196,12 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { noInvertedIdxCol += (cd.getColName() + ","); } } + LOGGER.info("Columns considered as NoInverted Index are " + noInvertedIdxCol); } private void initParameters(CarbonFactDataHandlerModel model) { - this.sortScope = model.getSortScope(); + SortScopeOptions.SortScope sortScope = model.getSortScope(); this.colGrpModel = model.getSegmentProperties().getColumnGroupModel(); //TODO need to pass carbon table identifier to metadata @@ -254,10 +249,10 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { consumerExecutorService = Executors.newFixedThreadPool(1); consumerExecutorServiceTaskList = new ArrayList<>(1); semaphore = new Semaphore(numberOfCores); - blockletDataHolder = new BlockletDataHolder(); + tablePageList = new TablePageList(); // Start the consumer which will take each blocklet/page in order and write to a file - Consumer consumer = new Consumer(blockletDataHolder); + Consumer consumer = new Consumer(tablePageList); consumerExecutorServiceTaskList.add(consumerExecutorService.submit(consumer)); } @@ -314,20 +309,20 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { this.entryCount++; // if entry count reaches to leaf node size then we are ready to write // this to leaf node file and update the intermediate files - if (this.entryCount == this.blockletSize) { + if (this.entryCount == this.pageSize) { try { semaphore.acquire(); producerExecutorServiceTaskList.add( producerExecutorService.submit( - new Producer(blockletDataHolder, dataRows, ++writerTaskSequenceCounter, false) + new Producer(tablePageList, dataRows, ++writerTaskSequenceCounter, false) ) ); blockletProcessingCount.incrementAndGet(); // set the entry count to zero processedDataCount += entryCount; LOGGER.info("Total Number Of records added to store: " + processedDataCount); - dataRows = new ArrayList<>(this.blockletSize); + dataRows = new ArrayList<>(this.pageSize); this.entryCount = 0; } catch (InterruptedException e) { LOGGER.error(e, e.getMessage()); @@ -339,10 +334,10 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { /** * generate the EncodedTablePage from the input rows (one page in case of V3 format) */ - private EncodedTablePage processDataRows(List<CarbonRow> dataRows) + private TablePage processDataRows(List<CarbonRow> dataRows) throws CarbonDataWriterException, KeyGenException, MemoryException, IOException { if (dataRows.size() == 0) { - return EncodedTablePage.newEmptyInstance(); + return new TablePage(model, 0); } TablePage tablePage = new TablePage(model, dataRows.size()); int rowId = 0; @@ -352,11 +347,10 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { tablePage.addRow(rowId++, row); } - EncodedTablePage encoded = tablePage.encode(); - tablePage.freeMemory(); + tablePage.encode(); LOGGER.info("Number Of records processed: " + dataRows.size()); - return encoded; + return tablePage; } /** @@ -370,7 +364,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { try { semaphore.acquire(); producerExecutorServiceTaskList.add(producerExecutorService - .submit(new Producer(blockletDataHolder, dataRows, ++writerTaskSequenceCounter, true))); + .submit(new Producer(tablePageList, dataRows, ++writerTaskSequenceCounter, true))); blockletProcessingCount.incrementAndGet(); processedDataCount += entryCount; closeWriterExecutionService(producerExecutorService); @@ -471,19 +465,19 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { */ private void setWritingConfiguration() throws CarbonDataWriterException { // get blocklet size - this.blockletSize = Integer.parseInt(CarbonProperties.getInstance() + this.pageSize = Integer.parseInt(CarbonProperties.getInstance() .getProperty(CarbonCommonConstants.BLOCKLET_SIZE, CarbonCommonConstants.BLOCKLET_SIZE_DEFAULT_VAL)); if (version == ColumnarFormatVersion.V3) { - this.blockletSize = Integer.parseInt(CarbonProperties.getInstance() + this.pageSize = Integer.parseInt(CarbonProperties.getInstance() .getProperty(CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE, CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT)); } - LOGGER.info("Number of rows per column blocklet " + blockletSize); - dataRows = new ArrayList<>(this.blockletSize); + LOGGER.info("Number of rows per column blocklet " + pageSize); + dataRows = new ArrayList<>(this.pageSize); int dimSet = Integer.parseInt(CarbonCommonConstants.DIMENSION_SPLIT_VALUE_IN_COLUMNAR_DEFAULTVALUE); - // if atleast one dimension is present then initialize column splitter otherwise null + // if at least one dimension is present then initialize column splitter otherwise null int noOfColStore = colGrpModel.getNoOfColumnStore(); int[] keyBlockSize = new int[noOfColStore + getExpandedComplexColsCount()]; @@ -494,16 +488,16 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { //row store will be in single column store //e.g if {0,1,2,3,4,5} is dimension and {0,1,2) is row store dimension //than below splitter will return column as {0,1,2}{3}{4}{5} - this.columnarSplitter = model.getSegmentProperties().getFixedLengthKeySplitter(); + ColumnarSplitter columnarSplitter = model.getSegmentProperties().getFixedLengthKeySplitter(); System.arraycopy(columnarSplitter.getBlockKeySize(), 0, keyBlockSize, 0, noOfColStore); this.keyBlockHolder = - new CarbonKeyBlockHolder[this.columnarSplitter.getBlockKeySize().length]; + new CarbonKeyBlockHolder[columnarSplitter.getBlockKeySize().length]; } else { this.keyBlockHolder = new CarbonKeyBlockHolder[0]; } for (int i = 0; i < keyBlockHolder.length; i++) { - this.keyBlockHolder[i] = new CarbonKeyBlockHolder(blockletSize); + this.keyBlockHolder[i] = new CarbonKeyBlockHolder(pageSize); this.keyBlockHolder[i].resetCounter(); } @@ -535,7 +529,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { .getBlockKeySize()); System.arraycopy(blockKeySize, noOfColStore, keyBlockSize, noOfColStore, blockKeySize.length - noOfColStore); - this.dataWriter = getFactDataWriter(keyBlockSize); + this.dataWriter = getFactDataWriter(); this.dataWriter.setIsNoDictionary(isNoDictionary); // initialize the channel; this.dataWriter.initializeWriter(); @@ -574,21 +568,19 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { /** * Below method will be used to get the fact data writer instance * - * @param keyBlockSize * @return data writer instance */ - private CarbonFactDataWriter<?> getFactDataWriter(int[] keyBlockSize) { + private CarbonFactDataWriter<?> getFactDataWriter() { return CarbonDataWriterFactory.getInstance() - .getFactDataWriter(version, getDataWriterVo(keyBlockSize)); + .getFactDataWriter(version, getDataWriterVo()); } /** * Below method will be used to get the writer vo * - * @param keyBlockSize size of each key block * @return data writer vo object */ - private CarbonDataWriterVo getDataWriterVo(int[] keyBlockSize) { + private CarbonDataWriterVo getDataWriterVo() { CarbonDataWriterVo carbonDataWriterVo = new CarbonDataWriterVo(); carbonDataWriterVo.setStoreLocation(model.getStoreLocation()); carbonDataWriterVo.setMeasureCount(model.getMeasureCount()); @@ -608,6 +600,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { carbonDataWriterVo.setBucketNumber(model.getBucketId()); carbonDataWriterVo.setTaskExtension(model.getTaskExtension()); carbonDataWriterVo.setSchemaUpdatedTimeStamp(model.getSchemaUpdatedTimeStamp()); + carbonDataWriterVo.setListener(model.getDataMapWriterlistener()); return carbonDataWriterVo; } @@ -644,14 +637,13 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { } /** - * This class will hold the holder objects and manage producer and consumer for reading - * and writing the blocklet data + * This class will hold the table page data */ - private final class BlockletDataHolder { + private final class TablePageList { /** - * array of blocklet data holder objects + * array of table page added by Producer and get by Consumer */ - private EncodedTablePage[] encodedTablePages; + private TablePage[] tablePages; /** * flag to check whether the producer has completed processing for holder * object which is required to be picked form an index @@ -662,8 +654,8 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { */ private int currentIndex; - private BlockletDataHolder() { - encodedTablePages = new EncodedTablePage[numberOfCores]; + private TablePageList() { + tablePages = new TablePage[numberOfCores]; available = new AtomicBoolean(false); } @@ -671,32 +663,32 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { * @return a node holder object * @throws InterruptedException if consumer thread is interrupted */ - public synchronized EncodedTablePage get() throws InterruptedException { - EncodedTablePage encodedTablePage = encodedTablePages[currentIndex]; + public synchronized TablePage get() throws InterruptedException { + TablePage tablePage = tablePages[currentIndex]; // if node holder is null means producer thread processing the data which has to // be inserted at this current index has not completed yet - if (null == encodedTablePage && !processingComplete) { + if (null == tablePage && !processingComplete) { available.set(false); } while (!available.get()) { wait(); } - encodedTablePage = encodedTablePages[currentIndex]; - encodedTablePages[currentIndex] = null; + tablePage = tablePages[currentIndex]; + tablePages[currentIndex] = null; currentIndex++; // reset current index when it reaches length of node holder array - if (currentIndex >= encodedTablePages.length) { + if (currentIndex >= tablePages.length) { currentIndex = 0; } - return encodedTablePage; + return tablePage; } /** * @param encodedTablePage * @param index */ - public synchronized void put(EncodedTablePage encodedTablePage, int index) { - encodedTablePages[index] = encodedTablePage; + public synchronized void put(TablePage tablePage, int index) { + tablePages[index] = tablePage; // notify the consumer thread when index at which object is to be inserted // becomes equal to current index from where data has to be picked for writing if (index == currentIndex) { @@ -711,16 +703,16 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { */ private final class Producer implements Callable<Void> { - private BlockletDataHolder blockletDataHolder; + private TablePageList tablePageList; private List<CarbonRow> dataRows; - private int sequenceNumber; + private int pageId; private boolean isLastPage; - private Producer(BlockletDataHolder blockletDataHolder, List<CarbonRow> dataRows, - int sequenceNumber, boolean isLastPage) { - this.blockletDataHolder = blockletDataHolder; + private Producer(TablePageList tablePageList, List<CarbonRow> dataRows, + int pageId, boolean isLastPage) { + this.tablePageList = tablePageList; this.dataRows = dataRows; - this.sequenceNumber = sequenceNumber; + this.pageId = pageId; this.isLastPage = isLastPage; } @@ -732,11 +724,11 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { */ @Override public Void call() throws Exception { try { - EncodedTablePage encodedTablePage = processDataRows(dataRows); - encodedTablePage.setIsLastPage(isLastPage); + TablePage tablePage = processDataRows(dataRows); + tablePage.setIsLastPage(isLastPage); // insert the object in array according to sequence number - int indexInNodeHolderArray = (sequenceNumber - 1) % numberOfCores; - blockletDataHolder.put(encodedTablePage, indexInNodeHolderArray); + int indexInNodeHolderArray = (pageId - 1) % numberOfCores; + tablePageList.put(tablePage, indexInNodeHolderArray); return null; } catch (Throwable throwable) { LOGGER.error(throwable, "Error in producer"); @@ -752,10 +744,10 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { */ private final class Consumer implements Callable<Void> { - private BlockletDataHolder blockletDataHolder; + private TablePageList tablePageList; - private Consumer(BlockletDataHolder blockletDataHolder) { - this.blockletDataHolder = blockletDataHolder; + private Consumer(TablePageList tablePageList) { + this.tablePageList = tablePageList; } /** @@ -766,11 +758,12 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { */ @Override public Void call() throws Exception { while (!processingComplete || blockletProcessingCount.get() > 0) { - EncodedTablePage encodedTablePage = null; + TablePage tablePage = null; try { - encodedTablePage = blockletDataHolder.get(); - if (null != encodedTablePage) { - dataWriter.writeTablePage(encodedTablePage); + tablePage = tablePageList.get(); + if (null != tablePage) { + dataWriter.writeTablePage(tablePage); + tablePage.freeMemory(); } blockletProcessingCount.decrementAndGet(); } catch (Throwable throwable) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java index 51ec84b..c059030 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java @@ -24,7 +24,6 @@ import java.util.List; import java.util.Map; import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.datastore.GenericDataType; import org.apache.carbondata.core.datastore.TableSpec; import org.apache.carbondata.core.datastore.block.SegmentProperties; import org.apache.carbondata.core.keygenerator.KeyGenerator; @@ -39,6 +38,8 @@ import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.path.CarbonStorePath; import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.processing.datamap.DataMapWriterListener; +import org.apache.carbondata.processing.datatypes.GenericDataType; import org.apache.carbondata.processing.model.CarbonLoadModel; import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration; import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants; @@ -159,6 +160,8 @@ public class CarbonFactDataHandlerModel { private SortScopeOptions.SortScope sortScope; + private DataMapWriterListener dataMapWriterlistener; + /** * Create the model using @{@link CarbonDataLoadConfiguration} */ @@ -254,6 +257,11 @@ public class CarbonFactDataHandlerModel { carbonFactDataHandlerModel.taskExtension = taskExtension; carbonFactDataHandlerModel.tableSpec = configuration.getTableSpec(); carbonFactDataHandlerModel.sortScope = CarbonDataProcessorUtil.getSortScope(configuration); + + DataMapWriterListener listener = new DataMapWriterListener(); + listener.registerAllWriter(configuration.getTableIdentifier(), configuration.getSegmentId()); + carbonFactDataHandlerModel.dataMapWriterlistener = listener; + return carbonFactDataHandlerModel; } @@ -557,5 +565,9 @@ public class CarbonFactDataHandlerModel { public SortScopeOptions.SortScope getSortScope() { return sortScope; } + + public DataMapWriterListener getDataMapWriterlistener() { + return dataMapWriterlistener; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java index 03f3e5e..d2363f1 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java @@ -24,7 +24,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; -import org.apache.carbondata.core.datastore.GenericDataType; +import org.apache.carbondata.core.datastore.DimensionType; import org.apache.carbondata.core.datastore.TableSpec; import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException; import org.apache.carbondata.core.datastore.page.ColumnPage; @@ -44,9 +44,8 @@ import org.apache.carbondata.core.datastore.row.CarbonRow; import org.apache.carbondata.core.datastore.row.WriteStepRowUtil; import org.apache.carbondata.core.keygenerator.KeyGenException; import org.apache.carbondata.core.memory.MemoryException; -import org.apache.carbondata.core.metadata.ColumnarFormatVersion; import org.apache.carbondata.core.metadata.datatype.DataType; -import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.processing.datatypes.GenericDataType; import org.apache.spark.sql.types.Decimal; @@ -73,7 +72,12 @@ public class TablePage { private TablePageKey key; - private ColumnarFormatVersion version = CarbonProperties.getInstance().getFormatVersion(); + private EncodedTablePage encodedTablePage; + + private EncodingStrategy encodingStrategy = new DefaultEncodingStrategy(); + + // true if it is last page of all input rows + private boolean isLastPage; TablePage(CarbonFactDataHandlerModel model, int pageSize) throws MemoryException { this.model = model; @@ -240,14 +244,16 @@ public class TablePage { return output; } - EncodedTablePage encode() throws KeyGenException, MemoryException, IOException { + void encode() throws KeyGenException, MemoryException, IOException { // encode dimensions and measure EncodedDimensionPage[] dimensions = encodeAndCompressDimensions(); EncodedMeasurePage[] measures = encodeAndCompressMeasures(); - return EncodedTablePage.newInstance(pageSize, dimensions, measures, key); + this.encodedTablePage = EncodedTablePage.newInstance(pageSize, dimensions, measures, key); } - private EncodingStrategy encodingStrategy = new DefaultEncodingStrategy(); + public EncodedTablePage getEncodedTablePage() { + return encodedTablePage; + } // apply measure and set encodedData in `encodedData` private EncodedMeasurePage[] encodeAndCompressMeasures() @@ -301,6 +307,52 @@ public class TablePage { encodedDimensions.addAll(encodedComplexDimenions); return encodedDimensions.toArray(new EncodedDimensionPage[encodedDimensions.size()]); } + + /** + * return column page of specified column name + */ + public ColumnPage getColumnPage(String columnName) { + int dictDimensionIndex = -1; + int noDictDimensionIndex = -1; + ColumnPage page = null; + TableSpec spec = model.getTableSpec(); + int numDimensions = spec.getNumDimensions(); + for (int i = 0; i < numDimensions; i++) { + DimensionType type = spec.getDimensionSpec(i).getDimensionType(); + if ((type == DimensionType.GLOBAL_DICTIONARY) || (type == DimensionType.DIRECT_DICTIONARY)) { + page = dictDimensionPages[++dictDimensionIndex]; + } else if (type == DimensionType.PLAIN_VALUE) { + page = noDictDimensionPages[++noDictDimensionIndex]; + } else { + // do not support datamap on complex column + continue; + } + String fieldName = spec.getDimensionSpec(i).getFieldName(); + if (fieldName.equalsIgnoreCase(columnName)) { + return page; + } + } + int numMeasures = spec.getNumMeasures(); + for (int i = 0; i < numMeasures; i++) { + String fieldName = spec.getMeasureSpec(i).getFieldName(); + if (fieldName.equalsIgnoreCase(columnName)) { + return measurePage[i]; + } + } + throw new IllegalArgumentException("DataMap: must have '" + columnName + "' column in schema"); + } + + public boolean isLastPage() { + return isLastPage; + } + + public void setIsLastPage(boolean isWriteAll) { + this.isLastPage = isWriteAll; + } + + public int getPageSize() { + return pageSize; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java index a34ed01..bcc0112 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java @@ -24,7 +24,6 @@ import java.io.FileFilter; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; -//import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.util.ArrayList; import java.util.Arrays; @@ -44,17 +43,14 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.impl.FileFactory; -import org.apache.carbondata.core.datastore.page.EncodedTablePage; import org.apache.carbondata.core.keygenerator.mdkey.NumberCompressor; import org.apache.carbondata.core.metadata.BlockletInfoColumnar; -import org.apache.carbondata.core.metadata.CarbonMetadata; import org.apache.carbondata.core.metadata.blocklet.index.BlockletBTreeIndex; import org.apache.carbondata.core.metadata.blocklet.index.BlockletIndex; import org.apache.carbondata.core.metadata.blocklet.index.BlockletMinMaxIndex; import org.apache.carbondata.core.metadata.converter.SchemaConverter; import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl; import org.apache.carbondata.core.metadata.index.BlockIndexInfo; -import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; import org.apache.carbondata.core.util.ByteUtil; import org.apache.carbondata.core.util.CarbonMergerUtil; @@ -66,6 +62,7 @@ import org.apache.carbondata.core.writer.CarbonIndexFileWriter; import org.apache.carbondata.format.BlockIndex; import org.apache.carbondata.format.BlockletInfo3; import org.apache.carbondata.format.IndexHeader; +import org.apache.carbondata.processing.datamap.DataMapWriterListener; import org.apache.carbondata.processing.store.file.FileData; import org.apache.commons.lang3.ArrayUtils; @@ -97,11 +94,21 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter< protected String carbonDataFileTempPath; /** - * The name of carbonData file + * The name of carbonData file (blockId) */ protected String carbonDataFileName; /** + * The sequence number of blocklet inside one block + */ + protected int blockletId = 0; + + /** + * The sequence number of page inside one blocklet + */ + protected int pageId = 0; + + /** * Local cardinality for the segment */ protected int[] localCardinality; @@ -132,7 +139,7 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter< /** * data block size for one carbon data file */ - private long dataBlockSize; + private long blockSizeThreshold; /** * file size at any given point */ @@ -152,6 +159,11 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter< */ protected List<org.apache.carbondata.format.BlockletIndex> blockletIndex; + /** + * listener to write data map + */ + protected DataMapWriterListener listener; + public AbstractFactDataWriter(CarbonDataWriterVo dataWriterVo) { this.dataWriterVo = dataWriterVo; this.blockletInfoList = @@ -163,22 +175,21 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter< this.fileSizeInBytes = (long) dataWriterVo.getTableBlocksize() * CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR * CarbonCommonConstants.BYTE_TO_KB_CONVERSION_FACTOR; - /* - size reserved in one file for writing block meta data. It will be in percentage - */ + + // size reserved in one file for writing block meta data. It will be in percentage int spaceReservedForBlockMetaSize = Integer.parseInt(propInstance .getProperty(CarbonCommonConstants.CARBON_BLOCK_META_RESERVED_SPACE, CarbonCommonConstants.CARBON_BLOCK_META_RESERVED_SPACE_DEFAULT)); - this.dataBlockSize = fileSizeInBytes - (fileSizeInBytes * spaceReservedForBlockMetaSize) / 100; - LOGGER.info("Total file size: " + fileSizeInBytes + " and dataBlock Size: " + dataBlockSize); + this.blockSizeThreshold = + fileSizeInBytes - (fileSizeInBytes * spaceReservedForBlockMetaSize) / 100; + LOGGER.info("Total file size: " + fileSizeInBytes + " and dataBlock Size: " + + blockSizeThreshold); this.executorService = Executors.newFixedThreadPool(1); executorServiceSubmitList = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); // in case of compaction we will pass the cardinality. this.localCardinality = dataWriterVo.getColCardinality(); - CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable( - dataWriterVo.getDatabaseName() + CarbonCommonConstants.UNDERSCORE + dataWriterVo - .getTableName()); + //TODO: We should delete the levelmetadata file after reading here. // so only data loading flow will need to read from cardinality file. if (null == this.localCardinality) { @@ -202,6 +213,7 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter< this.dataChunksLength = new ArrayList<>(); blockletMetadata = new ArrayList<BlockletInfo3>(); blockletIndex = new ArrayList<>(); + listener = dataWriterVo.getListener(); } /** @@ -241,18 +253,19 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter< } /** - * This method will be used to update the file channel with new file; new - * file will be created once existing file reached the file size limit This + * This method will be used to update the file channel with new file if exceeding block size + * threshold, new file will be created once existing file reached the file size limit This * method will first check whether existing file size is exceeded the file * size limit if yes then write the leaf metadata to file then set the * current file size to 0 close the existing file channel get the new file * name and get the channel for new file * - * @param blockletDataSize data size of one block + * @param blockletSizeToBeAdded data size of one block * @throws CarbonDataWriterException if any problem */ - protected void updateBlockletFileChannel(long blockletDataSize) throws CarbonDataWriterException { - if ((currentFileSize + blockletDataSize) >= dataBlockSize && currentFileSize != 0) { + protected void createNewFileIfReachThreshold(long blockletSizeToBeAdded) + throws CarbonDataWriterException { + if ((currentFileSize + blockletSizeToBeAdded) >= blockSizeThreshold && currentFileSize != 0) { // set the current file size to zero LOGGER.info("Writing data to file as max file size reached for file: " + carbonDataFileTempPath + " .Data block size: " + currentFileSize); @@ -265,16 +278,42 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter< this.dataChunksLength = new ArrayList<>(); this.blockletMetadata = new ArrayList<>(); this.blockletIndex = new ArrayList<>(); - CarbonUtil.closeStreams(this.fileOutputStream, this.fileChannel); - // rename carbon data file from in progress status to actual - renameCarbonDataFile(); - executorServiceSubmitList.add(executorService - .submit(new CopyThread(this.carbonDataFileTempPath - .substring(0, this.carbonDataFileTempPath.lastIndexOf('.'))))); + commitCurrentFile(false); // initialize the new channel initializeWriter(); } - currentFileSize += blockletDataSize; + currentFileSize += blockletSizeToBeAdded; + } + + private void notifyDataMapBlockStart() { + if (listener != null) { + listener.onBlockStart(carbonDataFileName); + } + } + + private void notifyDataMapBlockEnd() { + if (listener != null) { + listener.onBlockEnd(carbonDataFileName); + } + blockletId = 0; + } + + /** + * Finish writing current file. It will flush stream, copy and rename temp file to final file + * @param copyInCurrentThread set to false if want to do data copy in a new thread + */ + protected void commitCurrentFile(boolean copyInCurrentThread) { + notifyDataMapBlockEnd(); + CarbonUtil.closeStreams(this.fileOutputStream, this.fileChannel); + // rename carbon data file from in progress status to actual + renameCarbonDataFile(); + String fileName = this.carbonDataFileTempPath.substring(0, + this.carbonDataFileTempPath.lastIndexOf('.')); + if (copyInCurrentThread) { + copyCarbonDataFileToCarbonStorePath(fileName); + } else { + executorServiceSubmitList.add(executorService.submit(new CopyThread(fileName))); + } } /** @@ -310,6 +349,7 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter< throw new CarbonDataWriterException("Problem while getting the FileChannel for Leaf File", fileNotFoundException); } + notifyDataMapBlockStart(); } private int initFileCount() { @@ -433,18 +473,15 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter< * @throws CarbonDataWriterException */ public void closeWriter() throws CarbonDataWriterException { - CarbonUtil.closeStreams(this.fileOutputStream, this.fileChannel); if (this.blockletInfoList.size() > 0) { - renameCarbonDataFile(); - copyCarbonDataFileToCarbonStorePath( - this.carbonDataFileTempPath - .substring(0, this.carbonDataFileTempPath.lastIndexOf('.'))); + commitCurrentFile(true); try { writeIndexFile(); } catch (IOException e) { throw new CarbonDataWriterException("Problem while writing the index file", e); } } + CarbonUtil.closeStreams(this.fileOutputStream, this.fileChannel); closeExecutorService(); } @@ -590,17 +627,6 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter< } /** - * This method will be used to write leaf data to file - * file format - * <key><measure1><measure2>.... - * - * @throws CarbonDataWriterException - * @throws CarbonDataWriterException throws new CarbonDataWriterException if any problem - */ - public abstract void writeTablePage(EncodedTablePage encodedTablePage) - throws CarbonDataWriterException; - - /** * Below method will be used to update the min or max value * by removing the length from it * @@ -608,10 +634,6 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter< */ protected byte[] updateMinMaxForNoDictionary(byte[] valueWithLength) { return valueWithLength; -// ByteBuffer buffer = ByteBuffer.wrap(valueWithLength); -// byte[] actualValue = new byte[buffer.getShort()]; -// buffer.get(actualValue); -// return actualValue; } /** @@ -640,5 +662,6 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter< copyCarbonDataFileToCarbonStorePath(fileName); return null; } + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java index 225e031..26fff09 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java @@ -20,6 +20,7 @@ import java.util.List; import org.apache.carbondata.core.datastore.block.SegmentProperties; import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.processing.datamap.DataMapWriterListener; import org.apache.carbondata.processing.store.CarbonDataFileAttributes; import org.apache.carbondata.processing.store.file.IFileManagerComposite; @@ -64,6 +65,8 @@ public class CarbonDataWriterVo { private int taskExtension; + private DataMapWriterListener listener; + /** * @return the storeLocation */ @@ -303,4 +306,12 @@ public class CarbonDataWriterVo { public void setTaskExtension(int taskExtension) { this.taskExtension = taskExtension; } + + public void setListener(DataMapWriterListener listener) { + this.listener = listener; + } + + public DataMapWriterListener getListener() { + return listener; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java index f194f74..3b26b7c 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java @@ -18,14 +18,15 @@ package org.apache.carbondata.processing.store.writer; import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException; -import org.apache.carbondata.core.datastore.page.EncodedTablePage; +import org.apache.carbondata.processing.store.TablePage; public interface CarbonFactDataWriter<T> { /** * write a encoded table page + * @param tablePage */ - void writeTablePage(EncodedTablePage encodedTablePage) throws CarbonDataWriterException; + void writeTablePage(TablePage tablePage) throws CarbonDataWriterException; /** * Below method will be used to write the leaf meta data to file http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java index 0f1b52b..f849e21 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v1/CarbonFactDataWriterImplV1.java @@ -35,6 +35,7 @@ import org.apache.carbondata.core.util.CarbonMetadataUtil; import org.apache.carbondata.core.util.NodeHolder; import org.apache.carbondata.core.writer.CarbonFooterWriter; import org.apache.carbondata.format.FileFooter; +import org.apache.carbondata.processing.store.TablePage; import org.apache.carbondata.processing.store.writer.AbstractFactDataWriter; import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo; @@ -199,14 +200,14 @@ public class CarbonFactDataWriterImplV1 extends AbstractFactDataWriter<int[]> { return holder; } - @Override public void writeTablePage(EncodedTablePage encodedTablePage) + @Override public void writeTablePage(TablePage tablePage) throws CarbonDataWriterException { - if (encodedTablePage.getPageSize() == 0) { + if (tablePage.getPageSize() == 0) { return; } - long blockletDataSize = encodedTablePage.getEncodedSize(); - updateBlockletFileChannel(blockletDataSize); - NodeHolder nodeHolder = buildNodeHolder(encodedTablePage); + long blockletDataSize = tablePage.getEncodedTablePage().getEncodedSize(); + createNewFileIfReachThreshold(blockletDataSize); + NodeHolder nodeHolder = buildNodeHolder(tablePage.getEncodedTablePage()); // write data to file and get its offset long offset = writeDataToFile(nodeHolder, fileChannel); // get the blocklet info for currently added blocklet http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java index e19a5ce..3f49a7b 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v2/CarbonFactDataWriterImplV2.java @@ -37,6 +37,7 @@ import org.apache.carbondata.core.util.NodeHolder; import org.apache.carbondata.core.writer.CarbonFooterWriter; import org.apache.carbondata.format.DataChunk2; import org.apache.carbondata.format.FileFooter; +import org.apache.carbondata.processing.store.TablePage; import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo; import org.apache.carbondata.processing.store.writer.v1.CarbonFactDataWriterImplV1; @@ -63,19 +64,19 @@ public class CarbonFactDataWriterImplV2 extends CarbonFactDataWriterImplV1 { /** * Below method will be used to write the data to carbon data file * - * @param encodedTablePage + * @param tablePage * @throws CarbonDataWriterException any problem in writing operation */ - @Override public void writeTablePage(EncodedTablePage encodedTablePage) + @Override public void writeTablePage(TablePage tablePage) throws CarbonDataWriterException { - NodeHolder nodeHolder = buildNodeHolder(encodedTablePage); - if (encodedTablePage.getPageSize() == 0) { + NodeHolder nodeHolder = buildNodeHolder(tablePage.getEncodedTablePage()); + if (tablePage.getPageSize() == 0) { return; } // size to calculate the size of the blocklet int size = 0; // get the blocklet info object - BlockletInfoColumnar blockletInfo = getBlockletInfo(encodedTablePage, 0); + BlockletInfoColumnar blockletInfo = getBlockletInfo(tablePage.getEncodedTablePage(), 0); List<DataChunk2> datachunks = null; try { @@ -105,7 +106,7 @@ public class CarbonFactDataWriterImplV2 extends CarbonFactDataWriterImplV1 { nodeHolder.getTotalDimensionArrayLength() + nodeHolder.getTotalMeasureArrayLength() + size; // if size of the file already reached threshold size then create a new file and get the file // channel object - updateBlockletFileChannel(blockletDataSize); + createNewFileIfReachThreshold(blockletDataSize); // writer the version header in the file if current file size is zero // this is done so carbondata file can be read separately try { http://git-wip-us.apache.org/repos/asf/carbondata/blob/f089287c/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/BlockletDataHolder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/BlockletDataHolder.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/BlockletDataHolder.java new file mode 100644 index 0000000..68aee95 --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/BlockletDataHolder.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.processing.store.writer.v3; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.carbondata.core.datastore.page.EncodedTablePage; +import org.apache.carbondata.processing.store.TablePage; + +public class BlockletDataHolder { + private List<EncodedTablePage> encodedTablePage; + private List<TablePage> rawTablePages; + private long currentSize; + + public BlockletDataHolder() { + this.encodedTablePage = new ArrayList<>(); + this.rawTablePages = new ArrayList<>(); + } + + public void clear() { + encodedTablePage.clear(); + rawTablePages.clear(); + currentSize = 0; + } + + public void addPage(TablePage rawTablePage) { + EncodedTablePage encodedTablePage = rawTablePage.getEncodedTablePage(); + this.encodedTablePage.add(encodedTablePage); + this.rawTablePages.add(rawTablePage); + currentSize += encodedTablePage.getEncodedSize(); + } + + public long getSize() { + // increasing it by 15 percent for data chunk 3 of each column each page + return currentSize + ((currentSize * 15) / 100); + } + + public int getNumberOfPagesAdded() { + return encodedTablePage.size(); + } + + public int getTotalRows() { + int rows = 0; + for (EncodedTablePage nh : encodedTablePage) { + rows += nh.getPageSize(); + } + return rows; + } + + public List<EncodedTablePage> getEncodedTablePages() { + return encodedTablePage; + } + + public List<TablePage> getRawTablePages() { + return rawTablePages; + } +}
