Repository: carbondata Updated Branches: refs/heads/master 4b98af22d -> 7edef8f4a
[CARBONDATA-2417] SDK writer goes to infinite wait when consumer thread is dead problem: SDK writer goes to infinite wait when consumer thread is dead root cause: due to bad record when an exception happens at consumer thread during write, this message is not reached producer (SDK writer). So, SDK keeps writing data assuming consumer will consume it. But as a consumer is dead, queue becomes full and queue.put() will be blocked forever. Solution: When the consumer is dead, call writer.close() forcefully and clear queue. so that blocking write will go an and skip writing next batches. when writer.close() is called by user. Throw an exception that write is failed This closes #2251 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/7edef8f4 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/7edef8f4 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/7edef8f4 Branch: refs/heads/master Commit: 7edef8f4a780b1754ea4afe6794b4ba2ffa06794 Parents: 4b98af2 Author: ajantha-bhat <[email protected]> Authored: Sat Apr 28 18:17:35 2018 +0530 Committer: ravipesala <[email protected]> Committed: Tue May 1 20:20:02 2018 +0530 ---------------------------------------------------------------------- .../hadoop/api/CarbonTableOutputFormat.java | 9 ++++++--- .../TestNonTransactionalCarbonTable.scala | 20 ++++++++++++++++---- .../iterator/CarbonOutputIteratorWrapper.java | 16 +++++++++++++++- 3 files changed, 37 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/7edef8f4/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java index 36ba02d..7050c8f 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java @@ -243,7 +243,7 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje final String[] tempStoreLocations = getTempStoreLocations(taskAttemptContext); final CarbonOutputIteratorWrapper iteratorWrapper = new CarbonOutputIteratorWrapper(); final DataLoadExecutor dataLoadExecutor = new DataLoadExecutor(); - ExecutorService executorService = Executors.newFixedThreadPool(1, + final ExecutorService executorService = Executors.newFixedThreadPool(1, new CarbonThreadFactory("CarbonRecordWriter:" + loadModel.getTableName()));; // It should be started in new thread as the underlying iterator uses blocking queue. Future future = executorService.submit(new Thread() { @@ -252,9 +252,12 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje dataLoadExecutor .execute(loadModel, tempStoreLocations, new CarbonIterator[] { iteratorWrapper }); } catch (Exception e) { + executorService.shutdownNow(); dataLoadExecutor.close(); // clean up the folders and files created locally for data load operation TableProcessingOperations.deleteLocalDataLoadFolderLocation(loadModel, false, false); + + iteratorWrapper.closeWriter(true); throw new RuntimeException(e); } } @@ -407,7 +410,7 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje } @Override public void close(TaskAttemptContext taskAttemptContext) throws InterruptedException { - iteratorWrapper.closeWriter(); + iteratorWrapper.closeWriter(false); try { future.get(); } catch (ExecutionException e) { @@ -419,7 +422,7 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje // clean up the folders and files created locally for data load operation TableProcessingOperations.deleteLocalDataLoadFolderLocation(loadModel, false, false); } - LOG.info("Closed partition writer task " + taskAttemptContext.getTaskAttemptID()); + LOG.info("Closed writer task " + taskAttemptContext.getTaskAttemptID()); } public CarbonLoadModel getLoadModel() { http://git-wip-us.apache.org/repos/asf/carbondata/blob/7edef8f4/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala index 3adcec8..f1bda31 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala @@ -43,7 +43,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { "./src/test/resources/SparkCarbonFileFormat/WriterOutput/") .getCanonicalPath //getCanonicalPath gives path with \, so code expects /. Need to handle in code ? - writerPath = writerPath.replace("\\", "/"); + writerPath = writerPath.replace("\\", "/") def buildTestDataSingleFile(): Any = { FileUtils.deleteDirectory(new File(writerPath)) @@ -74,7 +74,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { def buildTestDataWithBadRecordFail(): Any = { FileUtils.deleteDirectory(new File(writerPath)) var options = Map("bAd_RECords_action" -> "FAIL").asJava - buildTestData(3, false, options) + buildTestData(15001, false, options) } def buildTestDataWithBadRecordIgnore(): Any = { @@ -127,7 +127,7 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { } var i = 0 while (i < rows) { - if (options != null){ + if ((options != null) && (i < 3)) { // writing a bad record writer.write(Array[String]( "robot" + i, String.valueOf(i.toDouble / 2), "robot")) } else { @@ -141,7 +141,8 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { } writer.close() } catch { - case ex: Exception => None + case ex: Exception => throw new RuntimeException(ex) + case _ => None } } @@ -636,4 +637,15 @@ class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll { cleanTestData() } + + test("test huge data write with one batch having bad record") { + + val exception = + intercept[RuntimeException] { + buildTestDataWithBadRecordFail() + } + assert(exception.getMessage() + .contains("Data load failed due to bad record")) + + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/7edef8f4/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java b/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java index 9229598..4067be1 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java @@ -47,6 +47,10 @@ public class CarbonOutputIteratorWrapper extends CarbonIterator<Object[]> { private ArrayBlockingQueue<RowBatch> queue = new ArrayBlockingQueue<>(10); public void write(Object[] row) throws InterruptedException { + if (close) { + // already might be closed forcefully + return; + } if (!loadBatch.addRow(row)) { loadBatch.readyRead(); queue.put(loadBatch); @@ -82,8 +86,18 @@ public class CarbonOutputIteratorWrapper extends CarbonIterator<Object[]> { return readBatch.next(); } - public void closeWriter() { + public void closeWriter(boolean isForceClose) { + if (close) { + // already might be closed forcefully + return; + } try { + if (isForceClose) { + // unblock the queue.put on the other thread and clear the queue. + queue.clear(); + close = true; + return; + } loadBatch.readyRead(); if (loadBatch.size > 0) { queue.put(loadBatch);
