[CARBONDATA-2603] Fix: error handling during reader build failure problem : When the CarbonReaderBuilder.build() is failed due to some problems like invalid projection that leads to query model creation failure. Blocklet datamap is not cleared for that table.So, the next reader instance uses old blocklet datamap . That creates error.
Solution: Clear the blocklet datamap if the reader build is failed. This closes #2368 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/e07b832f Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/e07b832f Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/e07b832f Branch: refs/heads/branch-1.4 Commit: e07b832f557d91e9663feb1457bc44cea5121966 Parents: 288aba1 Author: ajantha-bhat <ajanthab...@gmail.com> Authored: Mon Jun 11 19:17:33 2018 +0530 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Tue Jun 12 20:36:59 2018 +0530 ---------------------------------------------------------------------- .../sdk/file/CarbonReaderBuilder.java | 39 ++++++++----- .../carbondata/sdk/file/CarbonReaderTest.java | 61 ++++++++++++++++++++ 2 files changed, 84 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/e07b832f/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java index 83cb34e..ebee41a 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java @@ -24,6 +24,7 @@ import java.util.Objects; import org.apache.carbondata.common.annotations.InterfaceAudience; import org.apache.carbondata.common.annotations.InterfaceStability; +import org.apache.carbondata.core.datamap.DataMapStoreManager; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.scan.expression.Expression; @@ -200,23 +201,29 @@ public class CarbonReaderBuilder { format.setColumnProjection(job.getConfiguration(), projectionColumns); } - final List<InputSplit> splits = - format.getSplits(new JobContextImpl(job.getConfiguration(), new JobID())); - - List<RecordReader<Void, T>> readers = new ArrayList<>(splits.size()); - for (InputSplit split : splits) { - TaskAttemptContextImpl attempt = - new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID()); - RecordReader reader = format.createRecordReader(split, attempt); - try { - reader.initialize(split, attempt); - readers.add(reader); - } catch (Exception e) { - reader.close(); - throw e; + try { + final List<InputSplit> splits = + format.getSplits(new JobContextImpl(job.getConfiguration(), new JobID())); + + List<RecordReader<Void, T>> readers = new ArrayList<>(splits.size()); + for (InputSplit split : splits) { + TaskAttemptContextImpl attempt = + new TaskAttemptContextImpl(job.getConfiguration(), new TaskAttemptID()); + RecordReader reader = format.createRecordReader(split, attempt); + try { + reader.initialize(split, attempt); + readers.add(reader); + } catch (Exception e) { + reader.close(); + throw e; + } } + return new CarbonReader<>(readers); + } catch (Exception ex) { + // Clear the datamap cache as it can get added in getSplits() method + DataMapStoreManager.getInstance() + .clearDataMaps(table.getAbsoluteTableIdentifier()); + throw ex; } - - return new CarbonReader<>(readers); } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/e07b832f/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java index fb2e2bc..2bc4b1f 100644 --- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java +++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java @@ -353,6 +353,67 @@ public class CarbonReaderTest extends TestCase { } @Test + public void testWriteAndReadFilesWithReaderBuildFail() throws IOException, InterruptedException { + String path1 = "./testWriteFiles"; + String path2 = "./testWriteFiles2"; + FileUtils.deleteDirectory(new File(path1)); + FileUtils.deleteDirectory(new File(path2)); + + Field[] fields = new Field[] { new Field("c1", "string"), + new Field("c2", "int") }; + Schema schema = new Schema(fields); + CarbonWriterBuilder builder = CarbonWriter.builder(); + + CarbonWriter carbonWriter = null; + try { + carbonWriter = builder.outputPath(path1).isTransactionalTable(false).uniqueIdentifier(12345) + .buildWriterForCSVInput(schema); + } catch (InvalidLoadOptionException e) { + e.printStackTrace(); + } + carbonWriter.write(new String[] { "MNO", "100" }); + carbonWriter.close(); + + Field[] fields1 = new Field[] { new Field("p1", "string"), + new Field("p2", "int") }; + Schema schema1 = new Schema(fields1); + CarbonWriterBuilder builder1 = CarbonWriter.builder(); + + CarbonWriter carbonWriter1 = null; + try { + carbonWriter1 = builder1.outputPath(path2).isTransactionalTable(false).uniqueIdentifier(12345) + .buildWriterForCSVInput(schema1); + } catch (InvalidLoadOptionException e) { + e.printStackTrace(); + } + carbonWriter1.write(new String[] { "PQR", "200" }); + carbonWriter1.close(); + + try { + CarbonReader reader = + CarbonReader.builder(path1, "_temp"). + projection(new String[] { "c1", "c3" }) + .isTransactionalTable(false).build(); + } catch (Exception e){ + System.out.println("Success"); + } + CarbonReader reader1 = + CarbonReader.builder(path2, "_temp1") + .projection(new String[] { "p1", "p2" }) + .isTransactionalTable(false).build(); + + while (reader1.hasNext()) { + Object[] row1 = (Object[]) reader1.readNextRow(); + System.out.println(row1[0]); + System.out.println(row1[1]); + } + reader1.close(); + + FileUtils.deleteDirectory(new File(path1)); + FileUtils.deleteDirectory(new File(path2)); + } + + @Test public void testReadColumnTwice() throws IOException, InterruptedException { String path = "./testWriteFiles"; FileUtils.deleteDirectory(new File(path));