[CARBONDATA-903] data load is not failing even though bad records exists in the data in case of unsafe sort or batch sort
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/53accb35 Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/53accb35 Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/53accb35 Branch: refs/heads/branch-1.1 Commit: 53accb35685fa959b5262a46518b6e9b0480439f Parents: 9efcacd Author: mohammadshahidkhan <mohdshahidkhan1...@gmail.com> Authored: Tue Apr 11 18:26:51 2017 +0530 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Thu Apr 13 16:07:58 2017 +0530 ---------------------------------------------------------------------- .../DataLoadFailAllTypeSortTest.scala | 218 +++++++++++++++++++ .../newflow/sort/AbstractMergeSorter.java | 43 ++++ .../sort/impl/ParallelReadMergeSorterImpl.java | 18 +- ...arallelReadMergeSorterWithBucketingImpl.java | 16 +- .../UnsafeBatchParallelReadMergeSorterImpl.java | 43 +++- .../impl/UnsafeParallelReadMergeSorterImpl.java | 19 +- .../UnsafeSingleThreadFinalSortFilesMerger.java | 10 + 7 files changed, 333 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/53accb35/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala new file mode 100644 index 0000000..478b4d3 --- /dev/null +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.carbondata + +import java.io.File + +import org.apache.spark.sql.common.util.QueryTest +import org.apache.spark.sql.hive.HiveContext +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties + +/** + * Test Class for detailed query on timestamp datatypes + * + * + */ +class DataLoadFailAllTypeSortTest extends QueryTest with BeforeAndAfterAll { + var hiveContext: HiveContext = _ + + override def beforeAll: Unit = { + sql("drop table IF EXISTS data_pm") + sql("drop table IF EXISTS data_um") + sql("drop table IF EXISTS data_bm") + sql("drop table IF EXISTS data_bmf") + sql("drop table IF EXISTS data_tbm") + } + + test("dataload with parallel merge with bad_records_action='FAIL'") { + try { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, + new File("./target/test/badRecords") + .getCanonicalPath) + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd") + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FAIL"); + sql("create table data_pm(name String, dob long, weight int) " + + "STORED BY 'org.apache.carbondata.format'") + val testData = s"$resourcesPath/badrecords/dummy.csv" + sql(s"""LOAD DATA LOCAL INPATH '$testData' INTO table data_pm""") + + + } catch { + case x: Throwable => { + assert(x.getMessage.contains("Data load failed due to bad record")) + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy") + } + } + finally { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, + CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT); + } + } + + test("dataload with ENABLE_UNSAFE_SORT='true' with bad_records_action='FAIL'") { + try { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, + new File("./target/test/badRecords") + .getCanonicalPath) + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd") + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, "true"); + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FAIL"); + sql("create table data_um(name String, dob long, weight int) " + + "STORED BY 'org.apache.carbondata.format'") + val testData = s"$resourcesPath/badrecords/dummy.csv" + sql(s"""LOAD DATA LOCAL INPATH '$testData' INTO table data_um""") + + + } catch { + case x: Throwable => { + assert(x.getMessage.contains("Data load failed due to bad record")) + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy") + } + } + finally { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, "false"); + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, + CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT); + } + } + + test("dataload with LOAD_USE_BATCH_SORT='true' with bad_records_action='FAIL'") { + try { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, + new File("./target/test/badRecords") + .getCanonicalPath) + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd") + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.LOAD_USE_BATCH_SORT, "true"); + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FAIL"); + sql("create table data_bm(name String, dob long, weight int) " + + "STORED BY 'org.apache.carbondata.format'") + val testData = s"$resourcesPath/badrecords/dummy.csv" + sql(s"""LOAD DATA LOCAL INPATH '$testData' INTO table data_bm""") + + + } catch { + case x: Throwable => { + assert(x.getMessage.contains("Data load failed due to bad record")) + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy") + } + } + finally { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.LOAD_USE_BATCH_SORT, "false"); + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, + CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT); + } + } + + test("dataload with LOAD_USE_BATCH_SORT='true' with bad_records_action='FORCE'") { + try { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, + new File("./target/test/badRecords") + .getCanonicalPath) + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd") + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.LOAD_USE_BATCH_SORT, "true"); + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FORCE"); + sql("create table data_bmf(name String, dob long, weight int) " + + "STORED BY 'org.apache.carbondata.format'") + val testData = s"$resourcesPath/badrecords/dummy.csv" + sql(s"""LOAD DATA LOCAL INPATH '$testData' INTO table data_bmf""") + + + } catch { + case x: Throwable => { + assert(false) + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy") + } + } + finally { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.LOAD_USE_BATCH_SORT, "false"); + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, + CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT); + } + } + + test("dataload with table bucketing with bad_records_action='FAIL'") { + try { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, + new File("./target/test/badRecords") + .getCanonicalPath) + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd") + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FAIL"); + sql("create table data_tbm(name String, dob long, weight int) " + + "USING org.apache.spark.sql.CarbonSource OPTIONS('bucketnumber'='4', " + + "'bucketcolumns'='name', 'tableName'='data_tbm')") + val testData = s"$resourcesPath/badrecords/dummy.csv" + sql(s"""LOAD DATA LOCAL INPATH '$testData' INTO table data_tbm""") + + + } catch { + case x: Throwable => { + assert(x.getMessage.contains("Data load failed due to bad record")) + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy") + } + } + finally { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, + CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT); + } + } + + // + override def afterAll { + sql("drop table IF EXISTS data_pm") + sql("drop table IF EXISTS data_um") + sql("drop table IF EXISTS data_bm") + sql("drop table IF EXISTS data_bmf") + sql("drop table IF EXISTS data_tbm") + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy") + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/53accb35/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/AbstractMergeSorter.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/AbstractMergeSorter.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/AbstractMergeSorter.java new file mode 100644 index 0000000..5179baa --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/AbstractMergeSorter.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.processing.newflow.sort; + +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException; +import org.apache.carbondata.processing.newflow.sort.impl.ThreadStatusObserver; + +/** + * The class defines the common methods used in across various type of sort + */ +public abstract class AbstractMergeSorter implements Sorter { + /** + * instance of thread status observer + */ + protected ThreadStatusObserver threadStatusObserver; + + /** + * Below method will be used to check error in exception + */ + public void checkError() { + if (threadStatusObserver.getThrowable() != null) { + if (threadStatusObserver.getThrowable() instanceof CarbonDataLoadingException) { + throw (CarbonDataLoadingException) threadStatusObserver.getThrowable(); + } else { + throw new CarbonDataLoadingException(threadStatusObserver.getThrowable()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/53accb35/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java index ad96578..856b6ac 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java @@ -33,7 +33,7 @@ import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory; import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException; import org.apache.carbondata.processing.newflow.row.CarbonRow; import org.apache.carbondata.processing.newflow.row.CarbonRowBatch; -import org.apache.carbondata.processing.newflow.sort.Sorter; +import org.apache.carbondata.processing.newflow.sort.AbstractMergeSorter; import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException; import org.apache.carbondata.processing.sortandgroupby.sortdata.SortDataRows; import org.apache.carbondata.processing.sortandgroupby.sortdata.SortIntermediateFileMerger; @@ -47,7 +47,7 @@ import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; * First it sorts the data and write to temp files. These temp files will be merge sorted to get * final merge sort result. */ -public class ParallelReadMergeSorterImpl implements Sorter { +public class ParallelReadMergeSorterImpl extends AbstractMergeSorter { private static final LogService LOGGER = LogServiceFactory.getLogService(ParallelReadMergeSorterImpl.class.getName()); @@ -58,8 +58,6 @@ public class ParallelReadMergeSorterImpl implements Sorter { private ExecutorService executorService; - private ThreadStatusObserver threadStatusObserver; - private SingleThreadFinalSortFilesMerger finalMerger; private AtomicLong rowCounter; @@ -154,18 +152,6 @@ public class ParallelReadMergeSorterImpl implements Sorter { } /** - * Below method will be used to check error in exception - */ - private void checkError() { - if (threadStatusObserver.getThrowable() != null) { - if (threadStatusObserver.getThrowable() instanceof CarbonDataLoadingException) { - throw (CarbonDataLoadingException) threadStatusObserver.getThrowable(); - } else { - throw new CarbonDataLoadingException(threadStatusObserver.getThrowable()); - } - } - } - /** * Below method will be used to process data to next step */ private boolean processRowToNextStep(SortDataRows sortDataRows, SortParameters parameters) http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/53accb35/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java index e3049d2..e5af1c6 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java @@ -34,7 +34,7 @@ import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory; import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException; import org.apache.carbondata.processing.newflow.row.CarbonRow; import org.apache.carbondata.processing.newflow.row.CarbonRowBatch; -import org.apache.carbondata.processing.newflow.sort.Sorter; +import org.apache.carbondata.processing.newflow.sort.AbstractMergeSorter; import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException; import org.apache.carbondata.processing.sortandgroupby.sortdata.SortDataRows; import org.apache.carbondata.processing.sortandgroupby.sortdata.SortIntermediateFileMerger; @@ -50,7 +50,7 @@ import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; * This step is specifically for bucketing, it sorts each bucket data separately and write to * temp files. */ -public class ParallelReadMergeSorterWithBucketingImpl implements Sorter { +public class ParallelReadMergeSorterWithBucketingImpl extends AbstractMergeSorter { private static final LogService LOGGER = LogServiceFactory.getLogService(ParallelReadMergeSorterImpl.class.getName()); @@ -100,17 +100,21 @@ public class ParallelReadMergeSorterWithBucketingImpl implements Sorter { throw new CarbonDataLoadingException(e); } this.executorService = Executors.newFixedThreadPool(iterators.length); + this.threadStatusObserver = new ThreadStatusObserver(this.executorService); final int batchSize = CarbonProperties.getInstance().getBatchSize(); try { for (int i = 0; i < iterators.length; i++) { - executorService.submit(new SortIteratorThread(iterators[i], sortDataRows, rowCounter)); + executorService.submit(new SortIteratorThread(iterators[i], sortDataRows, rowCounter, + this.threadStatusObserver)); } executorService.shutdown(); executorService.awaitTermination(2, TimeUnit.DAYS); processRowToNextStep(sortDataRows, sortParameters); } catch (Exception e) { + checkError(); throw new CarbonDataLoadingException("Problem while shutdown the server ", e); } + checkError(); try { intermediateFileMerger.finish(); } catch (CarbonDataWriterException e) { @@ -197,11 +201,14 @@ public class ParallelReadMergeSorterWithBucketingImpl implements Sorter { private AtomicLong rowCounter; + private ThreadStatusObserver threadStatusObserver; + public SortIteratorThread(Iterator<CarbonRowBatch> iterator, SortDataRows[] sortDataRows, - AtomicLong rowCounter) { + AtomicLong rowCounter, ThreadStatusObserver observer) { this.iterator = iterator; this.sortDataRows = sortDataRows; this.rowCounter = rowCounter; + this.threadStatusObserver = observer; } @Override public Void call() throws CarbonDataLoadingException { @@ -222,6 +229,7 @@ public class ParallelReadMergeSorterWithBucketingImpl implements Sorter { } } catch (Exception e) { LOGGER.error(e); + this.threadStatusObserver.notifyFailed(e); throw new CarbonDataLoadingException(e); } return null; http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/53accb35/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java index f3a60fc..a54410c 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java @@ -36,7 +36,7 @@ import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingExcep import org.apache.carbondata.processing.newflow.row.CarbonRow; import org.apache.carbondata.processing.newflow.row.CarbonRowBatch; import org.apache.carbondata.processing.newflow.row.CarbonSortBatch; -import org.apache.carbondata.processing.newflow.sort.Sorter; +import org.apache.carbondata.processing.newflow.sort.AbstractMergeSorter; import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage; import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeSortDataRows; import org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeIntermediateMerger; @@ -49,7 +49,7 @@ import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterE * It parallely reads data from array of iterates and do merge sort. * It sorts data in batches and send to the next step. */ -public class UnsafeBatchParallelReadMergeSorterImpl implements Sorter { +public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter { private static final LogService LOGGER = LogServiceFactory.getLogService(UnsafeBatchParallelReadMergeSorterImpl.class.getName()); @@ -72,18 +72,22 @@ public class UnsafeBatchParallelReadMergeSorterImpl implements Sorter { @Override public Iterator<CarbonRowBatch>[] sort(Iterator<CarbonRowBatch>[] iterators) throws CarbonDataLoadingException { this.executorService = Executors.newFixedThreadPool(iterators.length); + this.threadStatusObserver = new ThreadStatusObserver(this.executorService); int batchSize = CarbonProperties.getInstance().getBatchSize(); - final SortBatchHolder sortBatchHolder = new SortBatchHolder(sortParameters, iterators.length); + final SortBatchHolder sortBatchHolder = new SortBatchHolder(sortParameters, iterators.length, + this.threadStatusObserver); try { for (int i = 0; i < iterators.length; i++) { - executorService - .submit(new SortIteratorThread(iterators[i], sortBatchHolder, batchSize, rowCounter)); + executorService.submit( + new SortIteratorThread(iterators[i], sortBatchHolder, batchSize, rowCounter, + this.threadStatusObserver)); } } catch (Exception e) { + checkError(); throw new CarbonDataLoadingException("Problem while shutdown the server ", e); } - + checkError(); // Creates the iterator to read from merge sorter. Iterator<CarbonSortBatch> batchIterator = new CarbonIterator<CarbonSortBatch>() { @@ -120,12 +124,15 @@ public class UnsafeBatchParallelReadMergeSorterImpl implements Sorter { private AtomicLong rowCounter; + private ThreadStatusObserver threadStatusObserver; + public SortIteratorThread(Iterator<CarbonRowBatch> iterator, SortBatchHolder sortDataRows, - int batchSize, AtomicLong rowCounter) { + int batchSize, AtomicLong rowCounter, ThreadStatusObserver threadStatusObserver) { this.iterator = iterator; this.sortDataRows = sortDataRows; this.buffer = new Object[batchSize][]; this.rowCounter = rowCounter; + this.threadStatusObserver = threadStatusObserver; } @Override public Void call() throws CarbonDataLoadingException { @@ -152,6 +159,7 @@ public class UnsafeBatchParallelReadMergeSorterImpl implements Sorter { } } catch (Exception e) { LOGGER.error(e); + this.threadStatusObserver.notifyFailed(e); throw new CarbonDataLoadingException(e); } finally { sortDataRows.finishThread(); @@ -176,10 +184,14 @@ public class UnsafeBatchParallelReadMergeSorterImpl implements Sorter { private AtomicInteger iteratorCount; - public SortBatchHolder(SortParameters sortParameters, int numberOfThreads) { + private ThreadStatusObserver threadStatusObserver; + + public SortBatchHolder(SortParameters sortParameters, int numberOfThreads, + ThreadStatusObserver threadStatusObserver) { this.sortParameters = sortParameters; this.iteratorCount = new AtomicInteger(numberOfThreads); this.mergerQueue = new LinkedBlockingQueue<>(); + this.threadStatusObserver = threadStatusObserver; createSortDataRows(); } @@ -197,7 +209,12 @@ public class UnsafeBatchParallelReadMergeSorterImpl implements Sorter { @Override public UnsafeSingleThreadFinalSortFilesMerger next() { try { - return mergerQueue.take(); + UnsafeSingleThreadFinalSortFilesMerger unsafeSingleThreadFinalSortFilesMerger = + mergerQueue.take(); + if (unsafeSingleThreadFinalSortFilesMerger.isStopProcess()) { + throw new RuntimeException(threadStatusObserver.getThrowable()); + } + return unsafeSingleThreadFinalSortFilesMerger; } catch (InterruptedException e) { throw new RuntimeException(e); } @@ -209,6 +226,14 @@ public class UnsafeBatchParallelReadMergeSorterImpl implements Sorter { public void finish() { try { + // if the mergerQue is empty and some CarbonDataLoadingException exception has occurred + // then set stop process to true in the finalmerger instance + if (mergerQueue.isEmpty() && threadStatusObserver != null + && threadStatusObserver.getThrowable() != null && threadStatusObserver + .getThrowable() instanceof CarbonDataLoadingException) { + finalMerger.setStopProcess(true); + mergerQueue.offer(finalMerger); + } processRowToNextStep(sortDataRow, sortParameters); unsafeIntermediateFileMerger.finish(); List<UnsafeCarbonRowPage> rowPages = unsafeIntermediateFileMerger.getRowPages(); http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/53accb35/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java index 18cf314..0caafec 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java @@ -34,7 +34,7 @@ import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory; import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException; import org.apache.carbondata.processing.newflow.row.CarbonRow; import org.apache.carbondata.processing.newflow.row.CarbonRowBatch; -import org.apache.carbondata.processing.newflow.sort.Sorter; +import org.apache.carbondata.processing.newflow.sort.AbstractMergeSorter; import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage; import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeSortDataRows; import org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeIntermediateMerger; @@ -49,7 +49,7 @@ import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; * First it sorts the data and write to temp files. These temp files will be merge sorted to get * final merge sort result. */ -public class UnsafeParallelReadMergeSorterImpl implements Sorter { +public class UnsafeParallelReadMergeSorterImpl extends AbstractMergeSorter { private static final LogService LOGGER = LogServiceFactory.getLogService(UnsafeParallelReadMergeSorterImpl.class.getName()); @@ -92,18 +92,22 @@ public class UnsafeParallelReadMergeSorterImpl implements Sorter { throw new CarbonDataLoadingException(e); } this.executorService = Executors.newFixedThreadPool(iterators.length); + this.threadStatusObserver = new ThreadStatusObserver(this.executorService); try { for (int i = 0; i < iterators.length; i++) { - executorService - .submit(new SortIteratorThread(iterators[i], sortDataRow, batchSize, rowCounter)); + executorService.submit( + new SortIteratorThread(iterators[i], sortDataRow, batchSize, rowCounter, + this.threadStatusObserver)); } executorService.shutdown(); executorService.awaitTermination(2, TimeUnit.DAYS); processRowToNextStep(sortDataRow, sortParameters); } catch (Exception e) { + checkError(); throw new CarbonDataLoadingException("Problem while shutdown the server ", e); } + checkError(); try { unsafeIntermediateFileMerger.finish(); List<UnsafeCarbonRowPage> rowPages = unsafeIntermediateFileMerger.getRowPages(); @@ -182,12 +186,16 @@ public class UnsafeParallelReadMergeSorterImpl implements Sorter { private AtomicLong rowCounter; + private ThreadStatusObserver threadStatusObserver; + public SortIteratorThread(Iterator<CarbonRowBatch> iterator, - UnsafeSortDataRows sortDataRows, int batchSize, AtomicLong rowCounter) { + UnsafeSortDataRows sortDataRows, int batchSize, AtomicLong rowCounter, + ThreadStatusObserver threadStatusObserver) { this.iterator = iterator; this.sortDataRows = sortDataRows; this.buffer = new Object[batchSize][]; this.rowCounter = rowCounter; + this.threadStatusObserver = threadStatusObserver; } @Override public Void call() throws CarbonDataLoadingException { @@ -208,6 +216,7 @@ public class UnsafeParallelReadMergeSorterImpl implements Sorter { } } catch (Exception e) { LOGGER.error(e); + this.threadStatusObserver.notifyFailed(e); throw new CarbonDataLoadingException(e); } return null; http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/53accb35/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java index b98a072..10c5191 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java @@ -80,6 +80,8 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec private String tableName; + private boolean isStopProcess; + public UnsafeSingleThreadFinalSortFilesMerger(SortParameters parameters) { this.parameters = parameters; // set measure and dimension count @@ -305,4 +307,12 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends CarbonIterator<Objec recordHolderHeapLocal = null; } } + + public boolean isStopProcess() { + return isStopProcess; + } + + public void setStopProcess(boolean stopProcess) { + isStopProcess = stopProcess; + } }