Repository: carbondata Updated Branches: refs/heads/master fe36e3bc9 -> cb51b8621
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala index 0c59bd9..3646fad 100644 --- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala +++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala @@ -260,8 +260,8 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll { checkAnswer(result_after, result_origin) val result_after1 = sql(s"select id, vin, logdate, phonenumber, country, area, salary from list_table_area where area < 'OutSpace' ") - val rssult_origin1 = sql(s"select id, vin, logdate, phonenumber, country, area, salary from list_table_area_origin where area < 'OutSpace' ") - checkAnswer(result_after1, rssult_origin1) + val result_origin1 = sql(s"select id, vin, logdate, phonenumber, country, area, salary from list_table_area_origin where area < 'OutSpace' ") + checkAnswer(result_after1, result_origin1) val result_after2 = sql("select id, vin, logdate, phonenumber, country, area, salary from list_table_area where area <= 'OutSpace' ") val result_origin2 = sql("select id, vin, logdate, phonenumber, country, area, salary from list_table_area_origin where area <= 'OutSpace' ") @@ -279,28 +279,24 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll { val result_origin5 = sql("select id, vin, logdate, phonenumber, country, area, salary from list_table_area_origin where area >= 'OutSpace' ") checkAnswer(result_after5, result_origin5) - sql("""ALTER TABLE list_table_area ADD PARTITION ('One', '(Two, Three)', 'Four')""".stripMargin) - val carbonTable1 = CarbonMetadata.getInstance().getCarbonTable("default_list_table_area") - val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getFactTableName) - val partitionIds1 = partitionInfo1.getPartitionIds - val new_list_info = partitionInfo1.getListInfo - assert(partitionIds1 == List(0, 1, 2, 3, 4, 5, 6, 7, 8).map(Integer.valueOf(_)).asJava) - assert(partitionInfo1.getMAX_PARTITION == 8) - assert(partitionInfo1.getNumPartitions == 9) - assert(new_list_info.get(0).get(0) == "Asia") - assert(new_list_info.get(1).get(0) == "America") - assert(new_list_info.get(2).get(0) == "Europe") - assert(new_list_info.get(3).get(0) == "OutSpace") - assert(new_list_info.get(4).get(0) == "Hi") - assert(new_list_info.get(5).get(0) == "One") - assert(new_list_info.get(6).get(0) == "Two") - assert(new_list_info.get(6).get(1) == "Three") - assert(new_list_info.get(7).get(0) == "Four") - validateDataFiles("default_list_table_area", "0", Seq(0, 1, 2, 4)) - - val result_after6 = sql("select id, vin, logdate, phonenumber, country, area, salary from list_table_area") - val result_origin6 = sql("""select id, vin, logdate, phonenumber, country, area, salary from list_table_area_origin""") - checkAnswer(result_after6, result_origin6) + intercept[Exception] { sql("""ALTER TABLE DROP PARTITION(0)""")} + intercept[Exception] { sql("""ALTER TABLE DROP PARTITION(0) WITH DATA""")} + + sql("""ALTER TABLE list_table_area DROP PARTITION(2) WITH DATA""") + val carbonTable2 = CarbonMetadata.getInstance().getCarbonTable("default_list_table_area") + val partitionInfo2 = carbonTable2.getPartitionInfo(carbonTable.getFactTableName) + val partitionIds2 = partitionInfo2.getPartitionIds + val list_info2 = partitionInfo2.getListInfo + assert(partitionIds2 == List(0, 1, 3, 4, 5).map(Integer.valueOf(_)).asJava) + assert(partitionInfo2.getMAX_PARTITION == 5) + assert(partitionInfo2.getNumPartitions == 5) + assert(list_info2.get(0).get(0) == "Asia") + assert(list_info2.get(1).get(0) == "Europe") + assert(list_info2.get(2).get(0) == "OutSpace") + assert(list_info2.get(3).get(0) == "Hi") + validateDataFiles("default_list_table_area", "0", Seq(0, 1, 4)) + checkAnswer(sql("select id, vin, logdate, phonenumber, country, area, salary from list_table_area"), + sql("select id, vin, logdate, phonenumber, country, area, salary from list_table_area_origin where area <> 'America' ")) } test("Alter table add partition: Range Partition") { @@ -309,9 +305,9 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll { val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName) val partitionIds = partitionInfo.getPartitionIds val range_info = partitionInfo.getRangeInfo - assert(partitionIds.size() == 6) assert(partitionIds == List(0, 1, 2, 3, 4, 5).map(Integer.valueOf(_)).asJava) assert(partitionInfo.getMAX_PARTITION == 5) + assert(partitionInfo.getNumPartitions == 6) assert(range_info.get(0) == "2014/01/01") assert(range_info.get(1) == "2015/01/01") assert(range_info.get(2) == "2016/01/01") @@ -341,6 +337,24 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll { val result_after5 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate where logdate > cast('2017/01/12 00:00:00' as timestamp) """) val result_origin5 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate_origin where logdate > cast('2017/01/12 00:00:00' as timestamp) """) checkAnswer(result_after5, result_origin5) + + sql("""ALTER TABLE range_table_logdate DROP PARTITION(3) WITH DATA;""") + val carbonTable1 = CarbonMetadata.getInstance().getCarbonTable("default_range_table_logdate") + val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getFactTableName) + val partitionIds1 = partitionInfo1.getPartitionIds + val range_info1 = partitionInfo1.getRangeInfo + assert(partitionIds1 == List(0, 1, 2, 4, 5).map(Integer.valueOf(_)).asJava) + assert(partitionInfo1.getMAX_PARTITION == 5) + assert(partitionInfo1.getNumPartitions == 5) + assert(range_info1.get(0) == "2014/01/01") + assert(range_info1.get(1) == "2015/01/01") + assert(range_info1.get(2) == "2017/01/01") + assert(range_info1.get(3) == "2018/01/01") + assert(range_info1.size() == 4) + validateDataFiles("default_range_table_logdate", "0", Seq(1, 2, 4, 5)) + val result_after6 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate""") + val result_origin6 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate_origin where logdate < '2015/01/01 00:00:00' or logdate >= '2016/01/01 00:00:00' """) + checkAnswer(result_after6, result_origin6) } test("test exception if invalid partition id is provided in alter command") { @@ -396,6 +410,26 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll { val result_after5 = sql("""select id, vin, logdate, phonenumber, country, area, salary from list_table_country where country > 'NotGood' """) val result_origin5 = sql("""select id, vin, logdate, phonenumber, country, area, salary from list_table_country_origin where country > 'NotGood' """) checkAnswer(result_after5, result_origin5) + + sql("""ALTER TABLE list_table_country DROP PARTITION(8)""") + val carbonTable1 = CarbonMetadata.getInstance().getCarbonTable("default_list_table_country") + val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getFactTableName) + val partitionIds1 = partitionInfo1.getPartitionIds + val list_info1 = partitionInfo1.getListInfo + assert(partitionIds1 == List(0, 1, 2, 3, 6, 7, 5).map(Integer.valueOf(_)).asJava) + assert(partitionInfo1.getMAX_PARTITION == 8) + assert(partitionInfo1.getNumPartitions == 7) + assert(list_info1.get(0).get(0) == "China") + assert(list_info1.get(0).get(1) == "US") + assert(list_info1.get(1).get(0) == "UK") + assert(list_info1.get(2).get(0) == "Japan") + assert(list_info1.get(3).get(0) == "Canada") + assert(list_info1.get(4).get(0) == "Russia") + assert(list_info1.get(5).get(0) == "Korea") + validateDataFiles("default_list_table_country", "0", Seq(0, 1, 2, 3)) + val result_origin6 = sql("""select id, vin, logdate, phonenumber, country, area, salary from list_table_country""") + val result_after6 = sql("""select id, vin, logdate, phonenumber, country, area, salary from list_table_country_origin""") + checkAnswer(result_origin6, result_after6) } test("Alter table split partition with different List Sequence: List Partition") { @@ -405,23 +439,21 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll { val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName) val partitionIds = partitionInfo.getPartitionIds val list_info = partitionInfo.getListInfo - assert(partitionIds == List(0, 1, 2, 3, 6, 7, 8, 5, 10, 11, 12).map(Integer.valueOf(_)).asJava) + assert(partitionIds == List(0, 1, 2, 3, 6, 7, 5, 10, 11, 12).map(Integer.valueOf(_)).asJava) assert(partitionInfo.getMAX_PARTITION == 12) - assert(partitionInfo.getNumPartitions == 11) + assert(partitionInfo.getNumPartitions == 10) assert(list_info.get(0).get(0) == "China") assert(list_info.get(0).get(1) == "US") assert(list_info.get(1).get(0) == "UK") assert(list_info.get(2).get(0) == "Japan") assert(list_info.get(3).get(0) == "Canada") assert(list_info.get(4).get(0) == "Russia") - assert(list_info.get(5).get(0) == "Good") - assert(list_info.get(5).get(1) == "NotGood") - assert(list_info.get(6).get(0) == "Korea") - assert(list_info.get(7).get(0) == "Part4") - assert(list_info.get(8).get(0) == "Part2") - assert(list_info.get(9).get(0) == "Part1") - assert(list_info.get(9).get(1) == "Part3") - validateDataFiles("default_list_table_country", "0", Seq(0, 1, 2, 3, 8)) + assert(list_info.get(5).get(0) == "Korea") + assert(list_info.get(6).get(0) == "Part4") + assert(list_info.get(7).get(0) == "Part2") + assert(list_info.get(8).get(0) == "Part1") + assert(list_info.get(8).get(1) == "Part3") + validateDataFiles("default_list_table_country", "0", Seq(0, 1, 2, 3)) val result_after = sql("""select id, vin, logdate, phonenumber, country, area, salary from list_table_country""") val result_origin = sql("""select id, vin, logdate, phonenumber, country, area, salary from list_table_country_origin""") checkAnswer(result_after, result_origin) @@ -528,6 +560,24 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll { val result_after5 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate_split where logdate > cast('2017/01/12 00:00:00' as timestamp) """) val result_origin5 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate_split_origin where logdate > cast('2017/01/12 00:00:00' as timestamp) """) checkAnswer(result_after5, result_origin5) + + sql("""ALTER TABLE range_table_logdate_split DROP PARTITION(6)""") + val carbonTable1 = CarbonMetadata.getInstance().getCarbonTable("default_range_table_logdate_split") + val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getFactTableName) + val partitionIds1 = partitionInfo1.getPartitionIds + val rangeInfo1 = partitionInfo1.getRangeInfo + assert(partitionIds1 == List(0, 1, 2, 3, 5).map(Integer.valueOf(_)).asJava) + assert(partitionInfo1.getMAX_PARTITION == 6) + assert(partitionInfo1.getNumPartitions == 5) + assert(rangeInfo1.get(0) == "2014/01/01") + assert(rangeInfo1.get(1) == "2015/01/01") + assert(rangeInfo1.get(2) == "2016/01/01") + assert(rangeInfo1.get(3) == "2017/01/01") + assert(rangeInfo1.size() == 4) + validateDataFiles("default_range_table_logdate_split", "0", Seq(0, 1, 2, 3, 5)) + val result_after6 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate_split""") + val result_origin6 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_logdate_split_origin""") + checkAnswer(result_after6, result_origin6) } test("Alter table split partition: Range Partition + Bucket") { @@ -568,6 +618,57 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll { val result_after5 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_bucket where logdate > cast('2017/01/12 00:00:00' as timestamp) """) val result_origin5 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_bucket_origin where logdate > cast('2017/01/12 00:00:00' as timestamp) """) checkAnswer(result_after5, result_origin5) + + sql("""ALTER TABLE range_table_bucket DROP PARTITION(6) WITH DATA""") + val carbonTable1 = CarbonMetadata.getInstance().getCarbonTable("default_range_table_bucket") + val partitionInfo1 = carbonTable1.getPartitionInfo(carbonTable.getFactTableName) + val partitionIds1 = partitionInfo1.getPartitionIds + val rangeInfo1 = partitionInfo1.getRangeInfo + assert(partitionIds1 == List(0, 1, 2, 3, 5).map(Integer.valueOf(_)).asJava) + assert(partitionInfo1.getMAX_PARTITION == 6) + assert(partitionInfo1.getNumPartitions == 5) + assert(rangeInfo1.get(0) == "2014/01/01") + assert(rangeInfo1.get(1) == "2015/01/01") + assert(rangeInfo1.get(2) == "2016/01/01") + assert(rangeInfo1.get(3) == "2017/01/01") + assert(rangeInfo1.size() == 4) + validateDataFiles("default_range_table_bucket", "0", Seq(1, 2, 3, 5)) + val result_after6 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_bucket""") + val result_origin6= sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_bucket_origin where logdate < '2017/01/01 00:00:00' or logdate >= '2018/01/01 00:00:00'""") + checkAnswer(result_after6, result_origin6) + + sql("""ALTER TABLE range_table_bucket DROP PARTITION(3)""") + val carbonTable2 = CarbonMetadata.getInstance().getCarbonTable("default_range_table_bucket") + val partitionInfo2 = carbonTable2.getPartitionInfo(carbonTable.getFactTableName) + val partitionIds2 = partitionInfo2.getPartitionIds + val rangeInfo2 = partitionInfo2.getRangeInfo + assert(partitionIds2 == List(0, 1, 2, 5).map(Integer.valueOf(_)).asJava) + assert(partitionInfo2.getMAX_PARTITION == 6) + assert(partitionInfo2.getNumPartitions == 4) + assert(rangeInfo2.get(0) == "2014/01/01") + assert(rangeInfo2.get(1) == "2015/01/01") + assert(rangeInfo2.get(2) == "2017/01/01") + assert(rangeInfo2.size() == 3) + validateDataFiles("default_range_table_bucket", "0", Seq(1, 2, 5)) + val result_origin7 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_bucket""") + val result_after7 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_bucket_origin where logdate < '2017/01/01 00:00:00' or logdate >= '2018/01/01 00:00:00'""") + checkAnswer(result_origin7, result_after7) + + sql("""ALTER TABLE range_table_bucket DROP PARTITION(5)""") + val carbonTable3 = CarbonMetadata.getInstance().getCarbonTable("default_range_table_bucket") + val partitionInfo3 = carbonTable3.getPartitionInfo(carbonTable.getFactTableName) + val partitionIds3 = partitionInfo3.getPartitionIds + val rangeInfo3 = partitionInfo3.getRangeInfo + assert(partitionIds3 == List(0, 1, 2).map(Integer.valueOf(_)).asJava) + assert(partitionInfo3.getMAX_PARTITION == 6) + assert(partitionInfo3.getNumPartitions == 3) + assert(rangeInfo3.get(0) == "2014/01/01") + assert(rangeInfo3.get(1) == "2015/01/01") + assert(rangeInfo3.size() == 2) + validateDataFiles("default_range_table_bucket", "0", Seq(0, 1, 2)) + val result_after8 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_bucket""") + val result_origin8 = sql("""select id, vin, logdate, phonenumber, country, area, salary from range_table_bucket_origin where logdate < '2017/01/01 00:00:00' or logdate >= '2018/01/01 00:00:00'""") + checkAnswer(result_after8, result_origin8) } test("test exception when alter partition and the values" http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java index 43d456f..838e5be 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java @@ -393,15 +393,14 @@ public final class CarbonDataMergerUtil { /** * To identify which all segments can be merged. * - * @param storeLocation * @param carbonLoadModel * @param compactionSize * @return */ - public static List<LoadMetadataDetails> identifySegmentsToBeMerged(String storeLocation, + public static List<LoadMetadataDetails> identifySegmentsToBeMerged( CarbonLoadModel carbonLoadModel, long compactionSize, List<LoadMetadataDetails> segments, CompactionType compactionType) { - + String storeLocation = carbonLoadModel.getStorePath(); List<LoadMetadataDetails> sortedSegments = new ArrayList<LoadMetadataDetails>(segments); sortSegments(sortedSegments); http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/processing/src/main/java/org/apache/carbondata/processing/spliter/RowResultProcessor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/spliter/RowResultProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/spliter/RowResultProcessor.java new file mode 100644 index 0000000..9316c9f --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/spliter/RowResultProcessor.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.processing.spliter; + +import java.util.List; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datastore.block.SegmentProperties; +import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException; +import org.apache.carbondata.core.datastore.row.CarbonRow; +import org.apache.carbondata.core.datastore.row.WriteStepRowUtil; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.processing.model.CarbonLoadModel; +import org.apache.carbondata.processing.spliter.exception.AlterPartitionSliceException; +import org.apache.carbondata.processing.store.CarbonDataFileAttributes; +import org.apache.carbondata.processing.store.CarbonFactDataHandlerColumnar; +import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel; +import org.apache.carbondata.processing.store.CarbonFactHandler; +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; + +public class RowResultProcessor { + + private CarbonFactHandler dataHandler; + private SegmentProperties segmentProperties; + + private static final LogService LOGGER = + LogServiceFactory.getLogService(RowResultProcessor.class.getName()); + + + public RowResultProcessor(CarbonTable carbonTable, CarbonLoadModel loadModel, + SegmentProperties segProp, String[] tempStoreLocation, Integer bucketId) { + CarbonDataProcessorUtil.createLocations(tempStoreLocation); + this.segmentProperties = segProp; + String tableName = carbonTable.getFactTableName(); + CarbonFactDataHandlerModel carbonFactDataHandlerModel = + CarbonFactDataHandlerModel.getCarbonFactDataHandlerModel(loadModel, carbonTable, + segProp, tableName, tempStoreLocation); + CarbonDataFileAttributes carbonDataFileAttributes = + new CarbonDataFileAttributes(Integer.parseInt(loadModel.getTaskNo()), + loadModel.getFactTimeStamp()); + carbonFactDataHandlerModel.setCarbonDataFileAttributes(carbonDataFileAttributes); + carbonFactDataHandlerModel.setBucketId(bucketId); + //Note: set compaction flow just to convert decimal type + carbonFactDataHandlerModel.setCompactionFlow(true); + dataHandler = new CarbonFactDataHandlerColumnar(carbonFactDataHandlerModel); + } + + public boolean execute(List<Object[]> resultList) { + boolean processStatus; + boolean isDataPresent = false; + + try { + if (!isDataPresent) { + dataHandler.initialise(); + isDataPresent = true; + } + for (Object[] row: resultList) { + addRow(row); + } + if (isDataPresent) + { + this.dataHandler.finish(); + } + processStatus = true; + } catch (AlterPartitionSliceException e) { + LOGGER.error(e, e.getMessage()); + LOGGER.error("Exception in executing RowResultProcessor" + e.getMessage()); + processStatus = false; + } finally { + try { + if (isDataPresent) { + this.dataHandler.closeHandler(); + } + } catch (Exception e) { + LOGGER.error("Exception while closing the handler in RowResultProcessor" + e.getMessage()); + processStatus = false; + } + } + return processStatus; + } + + private void addRow(Object[] carbonTuple) throws AlterPartitionSliceException { + CarbonRow row = WriteStepRowUtil.fromMergerRow(carbonTuple, segmentProperties); + try { + this.dataHandler.addDataToStore(row); + } catch (CarbonDataWriterException e) { + throw new AlterPartitionSliceException("Exception in adding rows in RowResultProcessor", e); + } + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/processing/src/main/java/org/apache/carbondata/processing/spliter/RowResultSpliterProcessor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/spliter/RowResultSpliterProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/spliter/RowResultSpliterProcessor.java deleted file mode 100644 index ea38a53..0000000 --- a/processing/src/main/java/org/apache/carbondata/processing/spliter/RowResultSpliterProcessor.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.carbondata.processing.spliter; - -import java.util.List; - -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.datastore.block.SegmentProperties; -import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException; -import org.apache.carbondata.core.datastore.row.CarbonRow; -import org.apache.carbondata.core.datastore.row.WriteStepRowUtil; -import org.apache.carbondata.core.metadata.schema.table.CarbonTable; -import org.apache.carbondata.processing.model.CarbonLoadModel; -import org.apache.carbondata.processing.spliter.exception.SliceSpliterException; -import org.apache.carbondata.processing.store.CarbonDataFileAttributes; -import org.apache.carbondata.processing.store.CarbonFactDataHandlerColumnar; -import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel; -import org.apache.carbondata.processing.store.CarbonFactHandler; -import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; - -public class RowResultSpliterProcessor { - - private CarbonFactHandler dataHandler; - private SegmentProperties segmentProperties; - - private static final LogService LOGGER = - LogServiceFactory.getLogService(RowResultSpliterProcessor.class.getName()); - - - public RowResultSpliterProcessor(CarbonTable carbonTable, CarbonLoadModel loadModel, - SegmentProperties segProp, String[] tempStoreLocation, Integer bucketId) { - CarbonDataProcessorUtil.createLocations(tempStoreLocation); - this.segmentProperties = segProp; - String tableName = carbonTable.getFactTableName(); - CarbonFactDataHandlerModel carbonFactDataHandlerModel = - CarbonFactDataHandlerModel.getCarbonFactDataHandlerModel(loadModel, carbonTable, - segProp, tableName, tempStoreLocation); - CarbonDataFileAttributes carbonDataFileAttributes = - new CarbonDataFileAttributes(Integer.parseInt(loadModel.getTaskNo()), - loadModel.getFactTimeStamp()); - carbonFactDataHandlerModel.setCarbonDataFileAttributes(carbonDataFileAttributes); - carbonFactDataHandlerModel.setBucketId(bucketId); - //Note: set compaction flow just to convert decimal type - carbonFactDataHandlerModel.setCompactionFlow(true); - dataHandler = new CarbonFactDataHandlerColumnar(carbonFactDataHandlerModel); - } - - public boolean execute(List<Object[]> resultList) { - boolean splitStatus; - boolean isDataPresent = false; - - try { - if (!isDataPresent) { - dataHandler.initialise(); - isDataPresent = true; - } - for (Object[] row: resultList) { - addRow(row); - } - if (isDataPresent) - { - this.dataHandler.finish(); - } - splitStatus = true; - } catch (SliceSpliterException e) { - LOGGER.error(e, e.getMessage()); - LOGGER.error("Exception in split partition" + e.getMessage()); - splitStatus = false; - } finally { - try { - if (isDataPresent) { - this.dataHandler.closeHandler(); - } - } catch (Exception e) { - LOGGER.error("Exception while closing the handler in partition spliter" + e.getMessage()); - splitStatus = false; - } - } - return splitStatus; - } - - private void addRow(Object[] carbonTuple) throws SliceSpliterException { - CarbonRow row = WriteStepRowUtil.fromMergerRow(carbonTuple, segmentProperties); - try { - this.dataHandler.addDataToStore(row); - } catch (CarbonDataWriterException e) { - throw new SliceSpliterException("Problem in writing rows when add/split the partition", e); - } - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/processing/src/main/java/org/apache/carbondata/processing/spliter/exception/AlterPartitionSliceException.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/spliter/exception/AlterPartitionSliceException.java b/processing/src/main/java/org/apache/carbondata/processing/spliter/exception/AlterPartitionSliceException.java new file mode 100644 index 0000000..0e53a1f --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/spliter/exception/AlterPartitionSliceException.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.processing.spliter.exception; + +import java.util.Locale; + +public class AlterPartitionSliceException extends Exception { + + /** + * default serial version ID. + */ + private static final long serialVersionUID = 1L; + + /** + * The Error message. + */ + private String msg = ""; + + /** + * Constructor + * + * @param msg The error message for this exception. + */ + public AlterPartitionSliceException(String msg) { + super(msg); + this.msg = msg; + } + + /** + * Constructor + * + * @param msg The error message for this exception. + */ + public AlterPartitionSliceException(String msg, Throwable t) { + super(msg, t); + this.msg = msg; + } + + /** + * This method is used to get the localized message. + * + * @param locale - A Locale object represents a specific geographical, + * political, or cultural region. + * @return - Localized error message. + */ + public String getLocalizedMessage(Locale locale) { + return ""; + } + + /** + * getLocalizedMessage + */ + @Override public String getLocalizedMessage() { + return super.getLocalizedMessage(); + } + + /** + * getMessage + */ + public String getMessage() { + return this.msg; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/processing/src/main/java/org/apache/carbondata/processing/spliter/exception/SliceSpliterException.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/spliter/exception/SliceSpliterException.java b/processing/src/main/java/org/apache/carbondata/processing/spliter/exception/SliceSpliterException.java deleted file mode 100644 index 17e679a..0000000 --- a/processing/src/main/java/org/apache/carbondata/processing/spliter/exception/SliceSpliterException.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.processing.spliter.exception; - -import java.util.Locale; - -public class SliceSpliterException extends Exception { - - /** - * default serial version ID. - */ - private static final long serialVersionUID = 1L; - - /** - * The Error message. - */ - private String msg = ""; - - /** - * Constructor - * - * @param msg The error message for this exception. - */ - public SliceSpliterException(String msg) { - super(msg); - this.msg = msg; - } - - /** - * Constructor - * - * @param msg The error message for this exception. - */ - public SliceSpliterException(String msg, Throwable t) { - super(msg, t); - this.msg = msg; - } - - /** - * This method is used to get the localized message. - * - * @param locale - A Locale object represents a specific geographical, - * political, or cultural region. - * @return - Localized error message. - */ - public String getLocalizedMessage(Locale locale) { - return ""; - } - - /** - * getLocalizedMessage - */ - @Override public String getLocalizedMessage() { - return super.getLocalizedMessage(); - } - - /** - * getMessage - */ - public String getMessage() { - return this.msg; - } -}