[CARBONDATA-903] data load is not failing even though bad records exists in the 
data in case of unsafe sort or batch sort


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/53accb35
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/53accb35
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/53accb35

Branch: refs/heads/branch-1.1
Commit: 53accb35685fa959b5262a46518b6e9b0480439f
Parents: 9efcacd
Author: mohammadshahidkhan <mohdshahidkhan1...@gmail.com>
Authored: Tue Apr 11 18:26:51 2017 +0530
Committer: ravipesala <ravi.pes...@gmail.com>
Committed: Thu Apr 13 16:07:58 2017 +0530

----------------------------------------------------------------------
 .../DataLoadFailAllTypeSortTest.scala           | 218 +++++++++++++++++++
 .../newflow/sort/AbstractMergeSorter.java       |  43 ++++
 .../sort/impl/ParallelReadMergeSorterImpl.java  |  18 +-
 ...arallelReadMergeSorterWithBucketingImpl.java |  16 +-
 .../UnsafeBatchParallelReadMergeSorterImpl.java |  43 +++-
 .../impl/UnsafeParallelReadMergeSorterImpl.java |  19 +-
 .../UnsafeSingleThreadFinalSortFilesMerger.java |  10 +
 7 files changed, 333 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/53accb35/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala
 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala
new file mode 100644
index 0000000..478b4d3
--- /dev/null
+++ 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.carbondata
+
+import java.io.File
+
+import org.apache.spark.sql.common.util.QueryTest
+import org.apache.spark.sql.hive.HiveContext
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+/**
+ * Test Class for detailed query on timestamp datatypes
+ *
+ *
+ */
+class DataLoadFailAllTypeSortTest extends QueryTest with BeforeAndAfterAll {
+  var hiveContext: HiveContext = _
+
+  override def beforeAll: Unit = {
+    sql("drop table IF EXISTS data_pm")
+    sql("drop table IF EXISTS data_um")
+    sql("drop table IF EXISTS data_bm")
+    sql("drop table IF EXISTS data_bmf")
+    sql("drop table IF EXISTS data_tbm")
+  }
+
+  test("dataload with parallel merge with bad_records_action='FAIL'") {
+    try {
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+          new File("./target/test/badRecords")
+            .getCanonicalPath)
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, 
"yyyy/MM/dd")
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FAIL");
+      sql("create table data_pm(name String, dob long, weight int) " +
+          "STORED BY 'org.apache.carbondata.format'")
+      val testData = s"$resourcesPath/badrecords/dummy.csv"
+      sql(s"""LOAD DATA LOCAL INPATH '$testData' INTO table data_pm""")
+
+
+    } catch {
+      case x: Throwable => {
+        assert(x.getMessage.contains("Data load failed due to bad record"))
+        CarbonProperties.getInstance()
+          .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, 
"dd-MM-yyyy")
+      }
+    }
+    finally {
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
+          CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT);
+    }
+  }
+
+  test("dataload with ENABLE_UNSAFE_SORT='true' with 
bad_records_action='FAIL'") {
+    try {
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+          new File("./target/test/badRecords")
+            .getCanonicalPath)
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, 
"yyyy/MM/dd")
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, "true");
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FAIL");
+      sql("create table data_um(name String, dob long, weight int) " +
+          "STORED BY 'org.apache.carbondata.format'")
+      val testData = s"$resourcesPath/badrecords/dummy.csv"
+      sql(s"""LOAD DATA LOCAL INPATH '$testData' INTO table data_um""")
+
+
+    } catch {
+      case x: Throwable => {
+        assert(x.getMessage.contains("Data load failed due to bad record"))
+        CarbonProperties.getInstance()
+          .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, 
"dd-MM-yyyy")
+      }
+    }
+    finally {
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, "false");
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
+          CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT);
+    }
+  }
+
+  test("dataload with LOAD_USE_BATCH_SORT='true' with 
bad_records_action='FAIL'") {
+    try {
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+          new File("./target/test/badRecords")
+            .getCanonicalPath)
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, 
"yyyy/MM/dd")
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.LOAD_USE_BATCH_SORT, "true");
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FAIL");
+      sql("create table data_bm(name String, dob long, weight int) " +
+          "STORED BY 'org.apache.carbondata.format'")
+      val testData = s"$resourcesPath/badrecords/dummy.csv"
+      sql(s"""LOAD DATA LOCAL INPATH '$testData' INTO table data_bm""")
+
+
+    } catch {
+      case x: Throwable => {
+        assert(x.getMessage.contains("Data load failed due to bad record"))
+        CarbonProperties.getInstance()
+          .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, 
"dd-MM-yyyy")
+      }
+    }
+    finally {
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.LOAD_USE_BATCH_SORT, "false");
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
+          CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT);
+    }
+  }
+
+  test("dataload with LOAD_USE_BATCH_SORT='true' with 
bad_records_action='FORCE'") {
+    try {
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+          new File("./target/test/badRecords")
+            .getCanonicalPath)
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, 
"yyyy/MM/dd")
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.LOAD_USE_BATCH_SORT, "true");
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FORCE");
+      sql("create table data_bmf(name String, dob long, weight int) " +
+          "STORED BY 'org.apache.carbondata.format'")
+      val testData = s"$resourcesPath/badrecords/dummy.csv"
+      sql(s"""LOAD DATA LOCAL INPATH '$testData' INTO table data_bmf""")
+
+
+    } catch {
+      case x: Throwable => {
+        assert(false)
+        CarbonProperties.getInstance()
+          .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, 
"dd-MM-yyyy")
+      }
+    }
+    finally {
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.LOAD_USE_BATCH_SORT, "false");
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
+          CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT);
+    }
+  }
+
+  test("dataload with table bucketing with bad_records_action='FAIL'") {
+    try {
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+          new File("./target/test/badRecords")
+            .getCanonicalPath)
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, 
"yyyy/MM/dd")
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FAIL");
+      sql("create table data_tbm(name String, dob long, weight int) " +
+          "USING org.apache.spark.sql.CarbonSource OPTIONS('bucketnumber'='4', 
" +
+          "'bucketcolumns'='name', 'tableName'='data_tbm')")
+      val testData = s"$resourcesPath/badrecords/dummy.csv"
+      sql(s"""LOAD DATA LOCAL INPATH '$testData' INTO table data_tbm""")
+
+
+    } catch {
+      case x: Throwable => {
+        assert(x.getMessage.contains("Data load failed due to bad record"))
+        CarbonProperties.getInstance()
+          .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, 
"dd-MM-yyyy")
+      }
+    }
+    finally {
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
+          CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT);
+    }
+  }
+
+  //
+  override def afterAll {
+    sql("drop table IF EXISTS data_pm")
+    sql("drop table IF EXISTS data_um")
+    sql("drop table IF EXISTS data_bm")
+    sql("drop table IF EXISTS data_bmf")
+    sql("drop table IF EXISTS data_tbm")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/53accb35/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/AbstractMergeSorter.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/AbstractMergeSorter.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/AbstractMergeSorter.java
new file mode 100644
index 0000000..5179baa
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/AbstractMergeSorter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.newflow.sort;
+
+import 
org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.newflow.sort.impl.ThreadStatusObserver;
+
+/**
+ * The class defines the common methods used in across various type of sort
+ */
+public abstract class AbstractMergeSorter implements Sorter {
+  /**
+   * instance of thread status observer
+   */
+  protected ThreadStatusObserver threadStatusObserver;
+
+  /**
+   * Below method will be used to check error in exception
+   */
+  public void checkError() {
+    if (threadStatusObserver.getThrowable() != null) {
+      if (threadStatusObserver.getThrowable() instanceof 
CarbonDataLoadingException) {
+        throw (CarbonDataLoadingException) threadStatusObserver.getThrowable();
+      } else {
+        throw new 
CarbonDataLoadingException(threadStatusObserver.getThrowable());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/53accb35/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
index ad96578..856b6ac 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
@@ -33,7 +33,7 @@ import 
org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
 import 
org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
 import org.apache.carbondata.processing.newflow.row.CarbonRow;
 import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
-import org.apache.carbondata.processing.newflow.sort.Sorter;
+import org.apache.carbondata.processing.newflow.sort.AbstractMergeSorter;
 import 
org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
 import org.apache.carbondata.processing.sortandgroupby.sortdata.SortDataRows;
 import 
org.apache.carbondata.processing.sortandgroupby.sortdata.SortIntermediateFileMerger;
@@ -47,7 +47,7 @@ import 
org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
  * First it sorts the data and write to temp files. These temp files will be 
merge sorted to get
  * final merge sort result.
  */
-public class ParallelReadMergeSorterImpl implements Sorter {
+public class ParallelReadMergeSorterImpl extends AbstractMergeSorter {
 
   private static final LogService LOGGER =
       
LogServiceFactory.getLogService(ParallelReadMergeSorterImpl.class.getName());
@@ -58,8 +58,6 @@ public class ParallelReadMergeSorterImpl implements Sorter {
 
   private ExecutorService executorService;
 
-  private ThreadStatusObserver threadStatusObserver;
-
   private SingleThreadFinalSortFilesMerger finalMerger;
 
   private AtomicLong rowCounter;
@@ -154,18 +152,6 @@ public class ParallelReadMergeSorterImpl implements Sorter 
{
   }
 
   /**
-   * Below method will be used to check error in exception
-   */
-  private void checkError() {
-    if (threadStatusObserver.getThrowable() != null) {
-      if (threadStatusObserver.getThrowable() instanceof 
CarbonDataLoadingException) {
-        throw (CarbonDataLoadingException) threadStatusObserver.getThrowable();
-      } else {
-        throw new 
CarbonDataLoadingException(threadStatusObserver.getThrowable());
-      }
-    }
-  }
-  /**
    * Below method will be used to process data to next step
    */
   private boolean processRowToNextStep(SortDataRows sortDataRows, 
SortParameters parameters)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/53accb35/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
index e3049d2..e5af1c6 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
@@ -34,7 +34,7 @@ import 
org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
 import 
org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
 import org.apache.carbondata.processing.newflow.row.CarbonRow;
 import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
-import org.apache.carbondata.processing.newflow.sort.Sorter;
+import org.apache.carbondata.processing.newflow.sort.AbstractMergeSorter;
 import 
org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
 import org.apache.carbondata.processing.sortandgroupby.sortdata.SortDataRows;
 import 
org.apache.carbondata.processing.sortandgroupby.sortdata.SortIntermediateFileMerger;
@@ -50,7 +50,7 @@ import 
org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
  * This step is specifically for bucketing, it sorts each bucket data 
separately and write to
  * temp files.
  */
-public class ParallelReadMergeSorterWithBucketingImpl implements Sorter {
+public class ParallelReadMergeSorterWithBucketingImpl extends 
AbstractMergeSorter {
 
   private static final LogService LOGGER =
       
LogServiceFactory.getLogService(ParallelReadMergeSorterImpl.class.getName());
@@ -100,17 +100,21 @@ public class ParallelReadMergeSorterWithBucketingImpl 
implements Sorter {
       throw new CarbonDataLoadingException(e);
     }
     this.executorService = Executors.newFixedThreadPool(iterators.length);
+    this.threadStatusObserver = new ThreadStatusObserver(this.executorService);
     final int batchSize = CarbonProperties.getInstance().getBatchSize();
     try {
       for (int i = 0; i < iterators.length; i++) {
-        executorService.submit(new SortIteratorThread(iterators[i], 
sortDataRows, rowCounter));
+        executorService.submit(new SortIteratorThread(iterators[i], 
sortDataRows, rowCounter,
+            this.threadStatusObserver));
       }
       executorService.shutdown();
       executorService.awaitTermination(2, TimeUnit.DAYS);
       processRowToNextStep(sortDataRows, sortParameters);
     } catch (Exception e) {
+      checkError();
       throw new CarbonDataLoadingException("Problem while shutdown the server 
", e);
     }
+    checkError();
     try {
       intermediateFileMerger.finish();
     } catch (CarbonDataWriterException e) {
@@ -197,11 +201,14 @@ public class ParallelReadMergeSorterWithBucketingImpl 
implements Sorter {
 
     private AtomicLong rowCounter;
 
+    private ThreadStatusObserver threadStatusObserver;
+
     public SortIteratorThread(Iterator<CarbonRowBatch> iterator, 
SortDataRows[] sortDataRows,
-        AtomicLong rowCounter) {
+        AtomicLong rowCounter, ThreadStatusObserver observer) {
       this.iterator = iterator;
       this.sortDataRows = sortDataRows;
       this.rowCounter = rowCounter;
+      this.threadStatusObserver = observer;
     }
 
     @Override public Void call() throws CarbonDataLoadingException {
@@ -222,6 +229,7 @@ public class ParallelReadMergeSorterWithBucketingImpl 
implements Sorter {
         }
       } catch (Exception e) {
         LOGGER.error(e);
+        this.threadStatusObserver.notifyFailed(e);
         throw new CarbonDataLoadingException(e);
       }
       return null;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/53accb35/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
index f3a60fc..a54410c 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
@@ -36,7 +36,7 @@ import 
org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingExcep
 import org.apache.carbondata.processing.newflow.row.CarbonRow;
 import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
 import org.apache.carbondata.processing.newflow.row.CarbonSortBatch;
-import org.apache.carbondata.processing.newflow.sort.Sorter;
+import org.apache.carbondata.processing.newflow.sort.AbstractMergeSorter;
 import 
org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage;
 import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeSortDataRows;
 import 
org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeIntermediateMerger;
@@ -49,7 +49,7 @@ import 
org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterE
  * It parallely reads data from array of iterates and do merge sort.
  * It sorts data in batches and send to the next step.
  */
-public class UnsafeBatchParallelReadMergeSorterImpl implements Sorter {
+public class UnsafeBatchParallelReadMergeSorterImpl extends 
AbstractMergeSorter {
 
   private static final LogService LOGGER =
       
LogServiceFactory.getLogService(UnsafeBatchParallelReadMergeSorterImpl.class.getName());
@@ -72,18 +72,22 @@ public class UnsafeBatchParallelReadMergeSorterImpl 
implements Sorter {
   @Override public Iterator<CarbonRowBatch>[] sort(Iterator<CarbonRowBatch>[] 
iterators)
       throws CarbonDataLoadingException {
     this.executorService = Executors.newFixedThreadPool(iterators.length);
+    this.threadStatusObserver = new ThreadStatusObserver(this.executorService);
     int batchSize = CarbonProperties.getInstance().getBatchSize();
-    final SortBatchHolder sortBatchHolder = new 
SortBatchHolder(sortParameters, iterators.length);
+    final SortBatchHolder sortBatchHolder = new 
SortBatchHolder(sortParameters, iterators.length,
+        this.threadStatusObserver);
 
     try {
       for (int i = 0; i < iterators.length; i++) {
-        executorService
-            .submit(new SortIteratorThread(iterators[i], sortBatchHolder, 
batchSize, rowCounter));
+        executorService.submit(
+            new SortIteratorThread(iterators[i], sortBatchHolder, batchSize, 
rowCounter,
+                this.threadStatusObserver));
       }
     } catch (Exception e) {
+      checkError();
       throw new CarbonDataLoadingException("Problem while shutdown the server 
", e);
     }
-
+    checkError();
     // Creates the iterator to read from merge sorter.
     Iterator<CarbonSortBatch> batchIterator = new 
CarbonIterator<CarbonSortBatch>() {
 
@@ -120,12 +124,15 @@ public class UnsafeBatchParallelReadMergeSorterImpl 
implements Sorter {
 
     private AtomicLong rowCounter;
 
+    private ThreadStatusObserver threadStatusObserver;
+
     public SortIteratorThread(Iterator<CarbonRowBatch> iterator, 
SortBatchHolder sortDataRows,
-        int batchSize, AtomicLong rowCounter) {
+        int batchSize, AtomicLong rowCounter, ThreadStatusObserver 
threadStatusObserver) {
       this.iterator = iterator;
       this.sortDataRows = sortDataRows;
       this.buffer = new Object[batchSize][];
       this.rowCounter = rowCounter;
+      this.threadStatusObserver = threadStatusObserver;
     }
 
     @Override public Void call() throws CarbonDataLoadingException {
@@ -152,6 +159,7 @@ public class UnsafeBatchParallelReadMergeSorterImpl 
implements Sorter {
         }
       } catch (Exception e) {
         LOGGER.error(e);
+        this.threadStatusObserver.notifyFailed(e);
         throw new CarbonDataLoadingException(e);
       } finally {
         sortDataRows.finishThread();
@@ -176,10 +184,14 @@ public class UnsafeBatchParallelReadMergeSorterImpl 
implements Sorter {
 
     private AtomicInteger iteratorCount;
 
-    public SortBatchHolder(SortParameters sortParameters, int numberOfThreads) 
{
+    private ThreadStatusObserver threadStatusObserver;
+
+    public SortBatchHolder(SortParameters sortParameters, int numberOfThreads,
+        ThreadStatusObserver threadStatusObserver) {
       this.sortParameters = sortParameters;
       this.iteratorCount = new AtomicInteger(numberOfThreads);
       this.mergerQueue = new LinkedBlockingQueue<>();
+      this.threadStatusObserver = threadStatusObserver;
       createSortDataRows();
     }
 
@@ -197,7 +209,12 @@ public class UnsafeBatchParallelReadMergeSorterImpl 
implements Sorter {
 
     @Override public UnsafeSingleThreadFinalSortFilesMerger next() {
       try {
-        return mergerQueue.take();
+        UnsafeSingleThreadFinalSortFilesMerger 
unsafeSingleThreadFinalSortFilesMerger =
+            mergerQueue.take();
+        if (unsafeSingleThreadFinalSortFilesMerger.isStopProcess()) {
+          throw new RuntimeException(threadStatusObserver.getThrowable());
+        }
+        return unsafeSingleThreadFinalSortFilesMerger;
       } catch (InterruptedException e) {
         throw new RuntimeException(e);
       }
@@ -209,6 +226,14 @@ public class UnsafeBatchParallelReadMergeSorterImpl 
implements Sorter {
 
     public void finish() {
       try {
+        // if the mergerQue is empty and some CarbonDataLoadingException 
exception has occurred
+        // then set stop process to true in the finalmerger instance
+        if (mergerQueue.isEmpty() && threadStatusObserver != null
+            && threadStatusObserver.getThrowable() != null && 
threadStatusObserver
+            .getThrowable() instanceof CarbonDataLoadingException) {
+          finalMerger.setStopProcess(true);
+          mergerQueue.offer(finalMerger);
+        }
         processRowToNextStep(sortDataRow, sortParameters);
         unsafeIntermediateFileMerger.finish();
         List<UnsafeCarbonRowPage> rowPages = 
unsafeIntermediateFileMerger.getRowPages();

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/53accb35/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java
index 18cf314..0caafec 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java
@@ -34,7 +34,7 @@ import 
org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
 import 
org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
 import org.apache.carbondata.processing.newflow.row.CarbonRow;
 import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
-import org.apache.carbondata.processing.newflow.sort.Sorter;
+import org.apache.carbondata.processing.newflow.sort.AbstractMergeSorter;
 import 
org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage;
 import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeSortDataRows;
 import 
org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeIntermediateMerger;
@@ -49,7 +49,7 @@ import 
org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
  * First it sorts the data and write to temp files. These temp files will be 
merge sorted to get
  * final merge sort result.
  */
-public class UnsafeParallelReadMergeSorterImpl implements Sorter {
+public class UnsafeParallelReadMergeSorterImpl extends AbstractMergeSorter {
 
   private static final LogService LOGGER =
       
LogServiceFactory.getLogService(UnsafeParallelReadMergeSorterImpl.class.getName());
@@ -92,18 +92,22 @@ public class UnsafeParallelReadMergeSorterImpl implements 
Sorter {
       throw new CarbonDataLoadingException(e);
     }
     this.executorService = Executors.newFixedThreadPool(iterators.length);
+    this.threadStatusObserver = new ThreadStatusObserver(this.executorService);
 
     try {
       for (int i = 0; i < iterators.length; i++) {
-        executorService
-            .submit(new SortIteratorThread(iterators[i], sortDataRow, 
batchSize, rowCounter));
+        executorService.submit(
+            new SortIteratorThread(iterators[i], sortDataRow, batchSize, 
rowCounter,
+                this.threadStatusObserver));
       }
       executorService.shutdown();
       executorService.awaitTermination(2, TimeUnit.DAYS);
       processRowToNextStep(sortDataRow, sortParameters);
     } catch (Exception e) {
+      checkError();
       throw new CarbonDataLoadingException("Problem while shutdown the server 
", e);
     }
+    checkError();
     try {
       unsafeIntermediateFileMerger.finish();
       List<UnsafeCarbonRowPage> rowPages = 
unsafeIntermediateFileMerger.getRowPages();
@@ -182,12 +186,16 @@ public class UnsafeParallelReadMergeSorterImpl implements 
Sorter {
 
     private AtomicLong rowCounter;
 
+    private ThreadStatusObserver threadStatusObserver;
+
     public SortIteratorThread(Iterator<CarbonRowBatch> iterator,
-        UnsafeSortDataRows sortDataRows, int batchSize, AtomicLong rowCounter) 
{
+        UnsafeSortDataRows sortDataRows, int batchSize, AtomicLong rowCounter,
+        ThreadStatusObserver threadStatusObserver) {
       this.iterator = iterator;
       this.sortDataRows = sortDataRows;
       this.buffer = new Object[batchSize][];
       this.rowCounter = rowCounter;
+      this.threadStatusObserver = threadStatusObserver;
     }
 
     @Override public Void call() throws CarbonDataLoadingException {
@@ -208,6 +216,7 @@ public class UnsafeParallelReadMergeSorterImpl implements 
Sorter {
         }
       } catch (Exception e) {
         LOGGER.error(e);
+        this.threadStatusObserver.notifyFailed(e);
         throw new CarbonDataLoadingException(e);
       }
       return null;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/53accb35/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
index b98a072..10c5191 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeSingleThreadFinalSortFilesMerger.java
@@ -80,6 +80,8 @@ public class UnsafeSingleThreadFinalSortFilesMerger extends 
CarbonIterator<Objec
 
   private String tableName;
 
+  private boolean isStopProcess;
+
   public UnsafeSingleThreadFinalSortFilesMerger(SortParameters parameters) {
     this.parameters = parameters;
     // set measure and dimension count
@@ -305,4 +307,12 @@ public class UnsafeSingleThreadFinalSortFilesMerger 
extends CarbonIterator<Objec
       recordHolderHeapLocal = null;
     }
   }
+
+  public boolean isStopProcess() {
+    return isStopProcess;
+  }
+
+  public void setStopProcess(boolean stopProcess) {
+    isStopProcess = stopProcess;
+  }
 }

Reply via email to