Repository: carbondata Updated Branches: refs/heads/master ff10bbb05 -> 6e77f2b9d
Fix concurrent testcase random failure Fix IUDConcurrentTest to run sql concurrently in correct order This closes #1800 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/6e77f2b9 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/6e77f2b9 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/6e77f2b9 Branch: refs/heads/master Commit: 6e77f2b9d96ed32898111c3fc7551a9dde3f8cf4 Parents: ff10bbb Author: Jacky Li <[email protected]> Authored: Mon Jan 15 09:49:38 2018 +0800 Committer: chenliang613 <[email protected]> Committed: Thu Jan 18 10:06:30 2018 +0800 ---------------------------------------------------------------------- .../hadoop/ft/CarbonOutputMapperTest.java | 115 ---------- .../hadoop/ft/CarbonTableInputFormatTest.java | 167 +++++++++++++- .../hadoop/ft/CarbonTableInputMapperTest.java | 219 ------------------- .../hadoop/ft/CarbonTableOutputFormatTest.java | 120 ++++++++++ .../hadoop/test/util/StoreCreator.java | 18 +- .../spark/testsuite/iud/IUDConcurrentTest.scala | 114 ++++++++-- .../store/writer/AbstractFactDataWriter.java | 3 +- 7 files changed, 388 insertions(+), 368 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/6e77f2b9/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonOutputMapperTest.java ---------------------------------------------------------------------- diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonOutputMapperTest.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonOutputMapperTest.java deleted file mode 100644 index 22a6e53..0000000 --- a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonOutputMapperTest.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.carbondata.hadoop.ft; - -import java.io.File; -import java.io.FilenameFilter; -import java.io.IOException; - -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.util.CarbonProperties; -import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.core.util.path.CarbonTablePath; -import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat; -import org.apache.carbondata.hadoop.test.util.StoreCreator; -import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat; -import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable; -import org.apache.carbondata.processing.loading.model.CarbonLoadModel; - -import junit.framework.TestCase; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.junit.Test; - -public class CarbonOutputMapperTest extends TestCase { - - CarbonLoadModel carbonLoadModel; - - // changed setUp to static init block to avoid un wanted multiple time store creation - static { - CarbonProperties.getInstance(). - addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, "/tmp/carbon/badrecords"); - } - - - @Test public void testOutputFormat() throws Exception { - runJob(""); - String segmentPath = CarbonTablePath.getSegmentPath(carbonLoadModel.getTablePath(), "0"); - File file = new File(segmentPath); - assert (file.exists()); - File[] listFiles = file.listFiles(new FilenameFilter() { - @Override public boolean accept(File dir, String name) { - return name.endsWith(".carbondata") || - name.endsWith(".carbonindex") || - name.endsWith(".carbonindexmerge"); - } - }); - - assert (listFiles.length == 2); - } - - - @Override public void tearDown() throws Exception { - super.tearDown(); - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "true"); - } - - @Override public void setUp() throws Exception { - super.setUp(); - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "false"); - carbonLoadModel = StoreCreator.getCarbonLoadModel(); - } - - public static class Map extends Mapper<NullWritable, StringArrayWritable, NullWritable, StringArrayWritable> { - - @Override protected void map(NullWritable key, StringArrayWritable value, Context context) - throws IOException, InterruptedException { - context.write(key, value); - } - } - - private void runJob(String outPath) throws Exception { - Configuration configuration = new Configuration(); - configuration.set("mapreduce.cluster.local.dir", new File(outPath + "1").getCanonicalPath()); - Job job = Job.getInstance(configuration); - job.setJarByClass(CarbonOutputMapperTest.class); - job.setOutputKeyClass(NullWritable.class); - job.setOutputValueClass(StringArrayWritable.class); - job.setMapperClass(Map.class); - job.setNumReduceTasks(0); - - FileInputFormat.addInputPath(job, new Path(carbonLoadModel.getFactFilePath())); - CarbonTableOutputFormat.setLoadModel(job.getConfiguration(), carbonLoadModel); - CarbonTableOutputFormat.setCarbonTable(job.getConfiguration(), carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable()); - CSVInputFormat.setHeaderExtractionEnabled(job.getConfiguration(), true); - job.setInputFormatClass(CSVInputFormat.class); - job.setOutputFormatClass(CarbonTableOutputFormat.class); - CarbonUtil.deleteFoldersAndFiles(new File(carbonLoadModel.getTablePath() + "1")); - FileOutputFormat.setOutputPath(job, new Path(carbonLoadModel.getTablePath() + "1")); - job.getConfiguration().set("outpath", outPath); - job.getConfiguration().set("query.id", String.valueOf(System.nanoTime())); - job.waitForCompletion(true); - } - -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/6e77f2b9/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableInputFormatTest.java ---------------------------------------------------------------------- diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableInputFormatTest.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableInputFormatTest.java index 1df8a1a..2f029ab 100644 --- a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableInputFormatTest.java +++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableInputFormatTest.java @@ -17,28 +17,41 @@ package org.apache.carbondata.hadoop.ft; +import java.io.BufferedReader; +import java.io.BufferedWriter; import java.io.File; import java.io.FileFilter; +import java.io.FileReader; +import java.io.FileWriter; +import java.io.IOException; import java.util.List; import java.util.UUID; import junit.framework.TestCase; import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.scan.expression.ColumnExpression; import org.apache.carbondata.core.scan.expression.Expression; import org.apache.carbondata.core.scan.expression.LiteralExpression; import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression; import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.hadoop.CarbonProjection; import org.apache.carbondata.hadoop.api.CarbonTableInputFormat; import org.apache.carbondata.hadoop.test.util.StoreCreator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -49,7 +62,11 @@ public class CarbonTableInputFormatTest { static { CarbonProperties.getInstance(). addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, "/tmp/carbon/badrecords"); - StoreCreator.createCarbonStore(); + try { + StoreCreator.createCarbonStore(); + } catch (Exception e) { + Assert.fail("create table failed: " + e.getMessage()); + } } @Test public void testGetFilteredSplits() throws Exception { @@ -99,4 +116,152 @@ public class CarbonTableInputFormatTest { Assert.assertTrue(splits != null && splits.size() == 1); } + @Test public void testInputFormatMapperReadAllRowsAndColumns() throws Exception { + try { + String outPath = "target/output"; + CarbonProjection carbonProjection = new CarbonProjection(); + carbonProjection.addColumn("ID"); + carbonProjection.addColumn("date"); + carbonProjection.addColumn("country"); + carbonProjection.addColumn("name"); + carbonProjection.addColumn("phonetype"); + carbonProjection.addColumn("serialname"); + carbonProjection.addColumn("salary"); + runJob(outPath, carbonProjection, null); + Assert.assertEquals("Count lines are not matching", 1000, countTheLines(outPath)); + Assert.assertEquals("Column count are not matching", 7, countTheColumns(outPath)); + } catch (Exception e) { + e.printStackTrace(); + Assert.assertTrue("failed", false); + throw e; + } finally { + StoreCreator.clearDataMaps(); + } + } + + @Test public void testInputFormatMapperReadAllRowsAndFewColumns() throws Exception { + try { + String outPath = "target/output2"; + CarbonProjection carbonProjection = new CarbonProjection(); + carbonProjection.addColumn("ID"); + carbonProjection.addColumn("country"); + carbonProjection.addColumn("salary"); + runJob(outPath, carbonProjection, null); + + Assert.assertEquals("Count lines are not matching", 1000, countTheLines(outPath)); + Assert.assertEquals("Column count are not matching", 3, countTheColumns(outPath)); + } catch (Exception e) { + e.printStackTrace(); + Assert.assertTrue("failed", false); + } finally { + StoreCreator.clearDataMaps(); + } + } + + @Test public void testInputFormatMapperReadAllRowsAndFewColumnsWithFilter() throws Exception { + try { + String outPath = "target/output3"; + CarbonProjection carbonProjection = new CarbonProjection(); + carbonProjection.addColumn("ID"); + carbonProjection.addColumn("country"); + carbonProjection.addColumn("salary"); + Expression expression = + new EqualToExpression(new ColumnExpression("country", DataTypes.STRING), + new LiteralExpression("france", DataTypes.STRING)); + runJob(outPath, carbonProjection, expression); + Assert.assertEquals("Count lines are not matching", 101, countTheLines(outPath)); + Assert.assertEquals("Column count are not matching", 3, countTheColumns(outPath)); + } catch (Exception e) { + Assert.assertTrue("failed", false); + } finally { + StoreCreator.clearDataMaps(); + } + } + + + private int countTheLines(String outPath) throws Exception { + File file = new File(outPath); + if (file.exists()) { + BufferedReader reader = new BufferedReader(new FileReader(file)); + int i = 0; + while (reader.readLine() != null) { + i++; + } + reader.close(); + return i; + } + return 0; + } + + private int countTheColumns(String outPath) throws Exception { + File file = new File(outPath); + if (file.exists()) { + BufferedReader reader = new BufferedReader(new FileReader(file)); + String[] split = reader.readLine().split(","); + reader.close(); + return split.length; + } + return 0; + } + + public static class Map extends Mapper<Void, Object[], Text, Text> { + + private BufferedWriter fileWriter; + + public void setup(Context context) throws IOException, InterruptedException { + String outPath = context.getConfiguration().get("outpath"); + File outFile = new File(outPath); + try { + fileWriter = new BufferedWriter(new FileWriter(outFile)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public void map(Void key, Object[] value, Context context) throws IOException { + StringBuilder builder = new StringBuilder(); + for (int i = 0; i < value.length; i++) { + builder.append(value[i]).append(","); + } + fileWriter.write(builder.toString().substring(0, builder.toString().length() - 1)); + fileWriter.newLine(); + } + + @Override public void cleanup(Context context) throws IOException, InterruptedException { + super.cleanup(context); + fileWriter.close(); + context.write(new Text(), new Text()); + } + } + + private void runJob(String outPath, CarbonProjection projection, Expression filter) + throws Exception { + + Configuration configuration = new Configuration(); + configuration.set("mapreduce.cluster.local.dir", new File(outPath + "1").getCanonicalPath()); + Job job = Job.getInstance(configuration); + job.setJarByClass(CarbonTableInputFormatTest.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + job.setMapperClass(Map.class); + job.setInputFormatClass(CarbonTableInputFormat.class); + job.setOutputFormatClass(TextOutputFormat.class); + AbsoluteTableIdentifier abs = StoreCreator.getAbsoluteTableIdentifier(); + if (projection != null) { + CarbonTableInputFormat.setColumnProjection(job.getConfiguration(), projection); + } + if (filter != null) { + CarbonTableInputFormat.setFilterPredicates(job.getConfiguration(), filter); + } + CarbonTableInputFormat.setDatabaseName(job.getConfiguration(), + abs.getCarbonTableIdentifier().getDatabaseName()); + CarbonTableInputFormat.setTableName(job.getConfiguration(), + abs.getCarbonTableIdentifier().getTableName()); + FileInputFormat.addInputPath(job, new Path(abs.getTablePath())); + CarbonUtil.deleteFoldersAndFiles(new File(outPath + "1")); + FileOutputFormat.setOutputPath(job, new Path(outPath + "1")); + job.getConfiguration().set("outpath", outPath); + job.getConfiguration().set("query.id", String.valueOf(System.nanoTime())); + boolean status = job.waitForCompletion(true); + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/6e77f2b9/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableInputMapperTest.java ---------------------------------------------------------------------- diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableInputMapperTest.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableInputMapperTest.java deleted file mode 100644 index bb37959..0000000 --- a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableInputMapperTest.java +++ /dev/null @@ -1,219 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.carbondata.hadoop.ft; - -import java.io.BufferedReader; -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileReader; -import java.io.FileWriter; -import java.io.IOException; - -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; -import org.apache.carbondata.core.metadata.datatype.DataTypes; -import org.apache.carbondata.core.util.CarbonProperties; -import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.hadoop.CarbonProjection; -import org.apache.carbondata.core.scan.expression.ColumnExpression; -import org.apache.carbondata.core.scan.expression.Expression; -import org.apache.carbondata.core.scan.expression.LiteralExpression; -import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression; -import org.apache.carbondata.hadoop.api.CarbonTableInputFormat; -import org.apache.carbondata.hadoop.test.util.StoreCreator; - -import junit.framework.TestCase; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.Mapper; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -public class CarbonTableInputMapperTest extends TestCase { - - // changed setUp to static init block to avoid un wanted multiple time store creation - static { - CarbonProperties.getInstance(). - addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, "/tmp/carbon/badrecords"); - StoreCreator.createCarbonStore(); - } - - @Test public void testInputFormatMapperReadAllRowsAndColumns() throws Exception { - try { - String outPath = "target/output"; - CarbonProjection carbonProjection = new CarbonProjection(); - carbonProjection.addColumn("ID"); - carbonProjection.addColumn("date"); - carbonProjection.addColumn("country"); - carbonProjection.addColumn("name"); - carbonProjection.addColumn("phonetype"); - carbonProjection.addColumn("serialname"); - carbonProjection.addColumn("salary"); - runJob(outPath, carbonProjection, null); - Assert.assertEquals("Count lines are not matching", 1000, countTheLines(outPath)); - Assert.assertEquals("Column count are not matching", 7, countTheColumns(outPath)); - } catch (Exception e) { - e.printStackTrace(); - Assert.assertTrue("failed", false); - throw e; - } finally { - StoreCreator.clearDataMaps(); - } - } - - @Test public void testInputFormatMapperReadAllRowsAndFewColumns() throws Exception { - try { - String outPath = "target/output2"; - CarbonProjection carbonProjection = new CarbonProjection(); - carbonProjection.addColumn("ID"); - carbonProjection.addColumn("country"); - carbonProjection.addColumn("salary"); - runJob(outPath, carbonProjection, null); - - Assert.assertEquals("Count lines are not matching", 1000, countTheLines(outPath)); - Assert.assertEquals("Column count are not matching", 3, countTheColumns(outPath)); - } catch (Exception e) { - e.printStackTrace(); - Assert.assertTrue("failed", false); - } finally { - StoreCreator.clearDataMaps(); - } - } - - @Test public void testInputFormatMapperReadAllRowsAndFewColumnsWithFilter() throws Exception { - try { - String outPath = "target/output3"; - CarbonProjection carbonProjection = new CarbonProjection(); - carbonProjection.addColumn("ID"); - carbonProjection.addColumn("country"); - carbonProjection.addColumn("salary"); - Expression expression = - new EqualToExpression(new ColumnExpression("country", DataTypes.STRING), - new LiteralExpression("france", DataTypes.STRING)); - runJob(outPath, carbonProjection, expression); - Assert.assertEquals("Count lines are not matching", 101, countTheLines(outPath)); - Assert.assertEquals("Column count are not matching", 3, countTheColumns(outPath)); - } catch (Exception e) { - Assert.assertTrue("failed", false); - } finally { - StoreCreator.clearDataMaps(); - } - } - - private int countTheLines(String outPath) throws Exception { - File file = new File(outPath); - if (file.exists()) { - BufferedReader reader = new BufferedReader(new FileReader(file)); - int i = 0; - while (reader.readLine() != null) { - i++; - } - reader.close(); - return i; - } - return 0; - } - - private int countTheColumns(String outPath) throws Exception { - File file = new File(outPath); - if (file.exists()) { - BufferedReader reader = new BufferedReader(new FileReader(file)); - String[] split = reader.readLine().split(","); - reader.close(); - return split.length; - } - return 0; - } - - @Override public void tearDown() throws Exception { - super.tearDown(); - CarbonProperties.getInstance() - .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "true"); - } - - public static class Map extends Mapper<Void, Object[], Text, Text> { - - private BufferedWriter fileWriter; - - public void setup(Context context) throws IOException, InterruptedException { - String outPath = context.getConfiguration().get("outpath"); - File outFile = new File(outPath); - try { - fileWriter = new BufferedWriter(new FileWriter(outFile)); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - public void map(Void key, Object[] value, Context context) throws IOException { - StringBuilder builder = new StringBuilder(); - for (int i = 0; i < value.length; i++) { - builder.append(value[i]).append(","); - } - fileWriter.write(builder.toString().substring(0, builder.toString().length() - 1)); - fileWriter.newLine(); - } - - @Override public void cleanup(Context context) throws IOException, InterruptedException { - super.cleanup(context); - fileWriter.close(); - context.write(new Text(), new Text()); - } - } - - private void runJob(String outPath, CarbonProjection projection, Expression filter) - throws Exception { - - Configuration configuration = new Configuration(); - configuration.set("mapreduce.cluster.local.dir", new File(outPath + "1").getCanonicalPath()); - Job job = Job.getInstance(configuration); - job.setJarByClass(CarbonTableInputMapperTest.class); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(IntWritable.class); - job.setMapperClass(Map.class); - job.setInputFormatClass(CarbonTableInputFormat.class); - job.setOutputFormatClass(TextOutputFormat.class); - AbsoluteTableIdentifier abs = StoreCreator.getAbsoluteTableIdentifier(); - if (projection != null) { - CarbonTableInputFormat.setColumnProjection(job.getConfiguration(), projection); - } - if (filter != null) { - CarbonTableInputFormat.setFilterPredicates(job.getConfiguration(), filter); - } - CarbonTableInputFormat.setDatabaseName(job.getConfiguration(), - abs.getCarbonTableIdentifier().getDatabaseName()); - CarbonTableInputFormat.setTableName(job.getConfiguration(), - abs.getCarbonTableIdentifier().getTableName()); - FileInputFormat.addInputPath(job, new Path(abs.getTablePath())); - CarbonUtil.deleteFoldersAndFiles(new File(outPath + "1")); - FileOutputFormat.setOutputPath(job, new Path(outPath + "1")); - job.getConfiguration().set("outpath", outPath); - job.getConfiguration().set("query.id", String.valueOf(System.nanoTime())); - boolean status = job.waitForCompletion(true); - } - - public static void main(String[] args) throws Exception { - new CarbonTableInputMapperTest().runJob("target/output", null, null); - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/6e77f2b9/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableOutputFormatTest.java ---------------------------------------------------------------------- diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableOutputFormatTest.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableOutputFormatTest.java new file mode 100644 index 0000000..9bb2f53 --- /dev/null +++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonTableOutputFormatTest.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.hadoop.ft; + +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; + +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.core.util.CarbonUtil; +import org.apache.carbondata.core.util.path.CarbonTablePath; +import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat; +import org.apache.carbondata.hadoop.test.util.StoreCreator; +import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat; +import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class CarbonTableOutputFormatTest { + + static CarbonLoadModel carbonLoadModel; + + // changed setUp to static init block to avoid un wanted multiple time store creation + static { + CarbonProperties.getInstance(). + addProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, "/tmp/carbon/badrecords"); + try { + carbonLoadModel = StoreCreator.createTableAndLoadModel(); + } catch (Exception e) { + Assert.fail("create table failed: " + e.getMessage()); + } + } + + + @Test public void testOutputFormat() throws Exception { + runJob(""); + String segmentPath = CarbonTablePath.getSegmentPath(carbonLoadModel.getTablePath(), "0"); + File file = new File(segmentPath); + assert (file.exists()); + File[] listFiles = file.listFiles(new FilenameFilter() { + @Override public boolean accept(File dir, String name) { + return name.endsWith(".carbondata") || + name.endsWith(".carbonindex") || + name.endsWith(".carbonindexmerge"); + } + }); + + assert (listFiles.length == 2); + } + + @After + public void tearDown() throws Exception { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "true"); + } + + @Before + public void setUp() throws Exception { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "false"); + } + + public static class Map extends Mapper<NullWritable, StringArrayWritable, NullWritable, StringArrayWritable> { + + @Override protected void map(NullWritable key, StringArrayWritable value, Context context) + throws IOException, InterruptedException { + context.write(key, value); + } + } + + private void runJob(String outPath) throws Exception { + Configuration configuration = new Configuration(); + configuration.set("mapreduce.cluster.local.dir", new File(outPath + "1").getCanonicalPath()); + Job job = Job.getInstance(configuration); + job.setJarByClass(CarbonTableOutputFormatTest.class); + job.setOutputKeyClass(NullWritable.class); + job.setOutputValueClass(StringArrayWritable.class); + job.setMapperClass(Map.class); + job.setNumReduceTasks(0); + + FileInputFormat.addInputPath(job, new Path(carbonLoadModel.getFactFilePath())); + CarbonTableOutputFormat.setLoadModel(job.getConfiguration(), carbonLoadModel); + CarbonTableOutputFormat.setCarbonTable(job.getConfiguration(), carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable()); + CSVInputFormat.setHeaderExtractionEnabled(job.getConfiguration(), true); + job.setInputFormatClass(CSVInputFormat.class); + job.setOutputFormatClass(CarbonTableOutputFormat.class); + CarbonUtil.deleteFoldersAndFiles(new File(carbonLoadModel.getTablePath() + "1")); + FileOutputFormat.setOutputPath(job, new Path(carbonLoadModel.getTablePath() + "1")); + job.getConfiguration().set("outpath", outPath); + job.getConfiguration().set("query.id", String.valueOf(System.nanoTime())); + job.waitForCompletion(true); + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/6e77f2b9/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java ---------------------------------------------------------------------- diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java index fc54238..fbf33d6 100644 --- a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java +++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java @@ -163,15 +163,9 @@ public class StoreCreator { /** * Create store without any restructure */ - public static void createCarbonStore() { - try { - CarbonLoadModel loadModel = getCarbonLoadModel(); - - executeGraph(loadModel, storePath); - - } catch (Exception e) { - e.printStackTrace(); - } + public static void createCarbonStore() throws Exception { + CarbonLoadModel loadModel = createTableAndLoadModel(); + loadData(loadModel, storePath); } /** @@ -181,7 +175,7 @@ public class StoreCreator { DataMapStoreManager.getInstance().clearDataMaps(absoluteTableIdentifier); } - public static CarbonLoadModel getCarbonLoadModel() throws Exception { + public static CarbonLoadModel createTableAndLoadModel() throws Exception { String factFilePath = new File("../hadoop/src/test/resources/data.csv").getCanonicalPath(); File storeDir = new File(storePath); @@ -387,7 +381,7 @@ public class StoreCreator { * @param storeLocation * @throws Exception */ - public static void executeGraph(CarbonLoadModel loadModel, String storeLocation) + public static void loadData(CarbonLoadModel loadModel, String storeLocation) throws Exception { new File(storeLocation).mkdirs(); String outPutLoc = storeLocation + "/etl"; @@ -519,7 +513,7 @@ public class StoreCreator { return date; } - public static void main(String[] args) { + public static void main(String[] args) throws Exception { StoreCreator.createCarbonStore(); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/6e77f2b9/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/IUDConcurrentTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/IUDConcurrentTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/IUDConcurrentTest.scala index dbe7445..bb1d26f 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/IUDConcurrentTest.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/IUDConcurrentTest.scala @@ -25,18 +25,31 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.test.util.QueryTest import org.apache.spark.sql.{DataFrame, SaveMode} -import org.scalatest.BeforeAndAfterAll +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datamap.dev.{DataMap, DataMapFactory, DataMapWriter} +import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager} +import org.apache.carbondata.core.datastore.page.ColumnPage +import org.apache.carbondata.core.indexstore.schema.FilterType +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.events.Event +import org.apache.carbondata.spark.testsuite.datamap.C2DataMapFactory -class IUDConcurrentTest extends QueryTest with BeforeAndAfterAll { +class IUDConcurrentTest extends QueryTest with BeforeAndAfterAll with BeforeAndAfterEach { private val executorService: ExecutorService = Executors.newFixedThreadPool(10) var df: DataFrame = _ override def beforeAll { dropTable() buildTestData() + + // register hook to the table to sleep, thus the other command will be executed + DataMapStoreManager.getInstance().createAndRegisterDataMap( + AbsoluteTableIdentifier.from(storeLocation + "/orders", "default", "orders"), + classOf[WaitingDataMap].getName, + "test") } private def buildTestData(): Unit = { @@ -48,7 +61,7 @@ class IUDConcurrentTest extends QueryTest with BeforeAndAfterAll { import sqlContext.implicits._ val sdf = new SimpleDateFormat("yyyy-MM-dd") - df = sqlContext.sparkSession.sparkContext.parallelize(1 to 1500000) + df = sqlContext.sparkSession.sparkContext.parallelize(1 to 150000) .map(value => (value, new java.sql.Date(sdf.parse("2015-07-" + (value % 10 + 10)).getTime), "china", "aaa" + value, "phone" + 555 * value, "ASD" + (60000 + value), 14999 + value, "ordersTable" + value)) @@ -62,8 +75,7 @@ class IUDConcurrentTest extends QueryTest with BeforeAndAfterAll { df.write .format("carbondata") .option("tableName", tableName) - .option("tempCSV", "true") - .option("compress", "true") + .option("tempCSV", "false") .mode(SaveMode.Overwrite) .save() } @@ -73,34 +85,53 @@ class IUDConcurrentTest extends QueryTest with BeforeAndAfterAll { dropTable() } + override def beforeEach(): Unit = { + Global.overwriteRunning = false + } + private def dropTable() = { sql("DROP TABLE IF EXISTS orders") sql("DROP TABLE IF EXISTS orders_overwrite") } - test("Concurrency test for Insert-Overwrite and compact") { - val tasks = new java.util.ArrayList[Callable[String]]() - tasks.add(new QueryTask(s"insert overWrite table orders select * from orders_overwrite")) - tasks.add(new QueryTask("alter table orders compact 'MINOR'")) - val futures: util.List[Future[String]] = executorService.invokeAll(tasks) - val results = futures.asScala.map(_.get) - assert(results.contains("PASS")) + // run the input SQL and block until it is running + private def runSqlAsync(sql: String): Future[String] = { + assert(!Global.overwriteRunning) + var count = 0 + val future = executorService.submit( + new QueryTask(sql) + ) + while (!Global.overwriteRunning && count < 1000) { + Thread.sleep(10) + // to avoid dead loop in case WaitingDataMap is not invoked + count += 1 + } + future + } + + test("compaction should fail if insert overwrite is in progress") { + val future = runSqlAsync("insert overWrite table orders select * from orders_overwrite") + val ex = intercept[Exception]{ + sql("alter table orders compact 'MINOR'") + } + assert(future.get.contains("PASS")) + assert(ex.getMessage.contains("Cannot run data loading and compaction on same table concurrently")) } - test("Concurrency test for Insert-Overwrite and update") { - val tasks = new java.util.ArrayList[Callable[String]]() - tasks.add(new QueryTask(s"insert overWrite table orders select * from orders_overwrite")) - tasks.add(new QueryTask("update orders set (o_country)=('newCountry') where o_country='china'")) - val futures: util.List[Future[String]] = executorService.invokeAll(tasks) - val results = futures.asScala.map(_.get) - assert("PASS".equals(results.head) && "FAIL".equals(results(1))) + test("update should fail if insert overwrite is in progress") { + val future = runSqlAsync("insert overWrite table orders select * from orders_overwrite") + val ex = intercept[Exception] { + sql("update orders set (o_country)=('newCountry') where o_country='china'").show + } + assert(future.get.contains("PASS")) + assert(ex.getMessage.contains("Cannot run data loading and update on same table concurrently")) } class QueryTask(query: String) extends Callable[String] { override def call(): String = { var result = "PASS" try { - sql(query).show() + sql(query).collect() } catch { case exception: Exception => LOGGER.error(exception.getMessage) result = "FAIL" @@ -109,4 +140,47 @@ class IUDConcurrentTest extends QueryTest with BeforeAndAfterAll { } } +} + +object Global { + var overwriteRunning = false +} + +class WaitingDataMap() extends DataMapFactory { + + override def init(identifier: AbsoluteTableIdentifier, dataMapName: String): Unit = { } + + override def fireEvent(event: Event): Unit = ??? + + override def clear(segmentId: String): Unit = {} + + override def clear(): Unit = {} + + override def getDataMaps(distributable: DataMapDistributable): java.util.List[DataMap] = ??? + + override def getDataMaps(segmentId: String): util.List[DataMap] = ??? + + override def createWriter(segmentId: String): DataMapWriter = { + new DataMapWriter { + override def onPageAdded(blockletId: Int, pageId: Int, pages: Array[ColumnPage]): Unit = { } + + override def onBlockletEnd(blockletId: Int): Unit = { } + + override def onBlockEnd(blockId: String): Unit = { } + + override def onBlockletStart(blockletId: Int): Unit = { } + + override def onBlockStart(blockId: String): Unit = { + // trigger the second SQL to execute + Global.overwriteRunning = true + + // wait for 1 second to let second SQL to finish + Thread.sleep(1000) + } + } + } + + override def getMeta: DataMapMeta = new DataMapMeta(List("o_country").asJava, FilterType.EQUALTO) + + override def toDistributable(segmentId: String): util.List[DataMapDistributable] = ??? } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/6e77f2b9/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java index 9d55d30..4cb9fdd 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java @@ -482,7 +482,8 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter { .substring(0, this.carbonDataFileTempPath.lastIndexOf('.'))); File curFile = new File(this.carbonDataFileTempPath); if (!curFile.renameTo(origFile)) { - throw new CarbonDataWriterException("Problem while renaming the file"); + throw new CarbonDataWriterException("Problem while renaming the file (" + curFile + + "), to file (" + origFile + ")"); } }
