Repository: carbondata Updated Branches: refs/heads/master 0ef7e55c4 -> 290ef5a3a
[HOTFIX][CARBONDATA-2591] Fix SDK CarbonReader filter issue There are some issue in SDK CarbonReader filter function, please check the lira. This closes #2363 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/290ef5a3 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/290ef5a3 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/290ef5a3 Branch: refs/heads/master Commit: 290ef5a3a90081b3c95ea0dc418f643ea5ad694f Parents: 0ef7e55 Author: xubo245 <xub...@huawei.com> Authored: Thu Jun 7 22:00:31 2018 +0800 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Tue Jun 12 10:54:04 2018 +0530 ---------------------------------------------------------------------- .../core/metadata/schema/table/CarbonTable.java | 35 +++ .../apache/carbondata/core/util/CarbonUtil.java | 1 + .../sdk/file/CarbonReaderBuilder.java | 6 +- .../carbondata/sdk/file/CarbonReaderTest.java | 251 ++++++++++++++++++- .../apache/carbondata/sdk/file/TestUtil.java | 14 ++ 5 files changed, 304 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/290ef5a3/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java index 6949643..20bc7a1 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java @@ -17,6 +17,8 @@ package org.apache.carbondata.core.metadata.schema.table; +import java.io.File; +import java.io.FilenameFilter; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; @@ -57,6 +59,8 @@ import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.DataTypeUtil; import org.apache.carbondata.core.util.path.CarbonTablePath; +import static org.apache.carbondata.core.util.CarbonUtil.thriftColumnSchemaToWrapperColumnSchema; + /** * Mapping class for Carbon actual table */ @@ -218,6 +222,37 @@ public class CarbonTable implements Serializable { } } + public static CarbonTable buildTable( + String tablePath, + String tableName) throws IOException { + TableInfo tableInfoInfer = CarbonUtil.buildDummyTableInfo(tablePath, "null", "null"); + File[] dataFiles = new File(tablePath).listFiles(new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + if (name == null) { + return false; + } + return name.endsWith("carbonindex"); + } + }); + if (dataFiles == null || dataFiles.length < 1) { + throw new RuntimeException("Carbon index file not exists."); + } + org.apache.carbondata.format.TableInfo tableInfo = CarbonUtil + .inferSchemaFromIndexFile(dataFiles[0].toString(), tableName); + List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>(); + for (org.apache.carbondata.format.ColumnSchema thriftColumnSchema : tableInfo + .getFact_table().getTable_columns()) { + ColumnSchema columnSchema = thriftColumnSchemaToWrapperColumnSchema(thriftColumnSchema); + if (columnSchema.getColumnReferenceId() == null) { + columnSchema.setColumnReferenceId(columnSchema.getColumnUniqueId()); + } + columnSchemaList.add(columnSchema); + } + tableInfoInfer.getFactTable().setListOfColumns(columnSchemaList); + return CarbonTable.buildFromTableInfo(tableInfoInfer); + } + public static CarbonTable buildDummyTable(String tablePath) throws IOException { TableInfo tableInfoInfer = CarbonUtil.buildDummyTableInfo(tablePath, "null", "null"); return CarbonTable.buildFromTableInfo(tableInfoInfer); http://git-wip-us.apache.org/repos/asf/carbondata/blob/290ef5a3/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java index e1e5e16..2aa4a05 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java @@ -2209,6 +2209,7 @@ public final class CarbonUtil { org.apache.carbondata.format.ColumnSchema externalColumnSchema) { ColumnSchema wrapperColumnSchema = new ColumnSchema(); wrapperColumnSchema.setColumnUniqueId(externalColumnSchema.getColumn_id()); + wrapperColumnSchema.setColumnReferenceId(externalColumnSchema.getColumnReferenceId()); wrapperColumnSchema.setColumnName(externalColumnSchema.getColumn_name()); wrapperColumnSchema.setColumnar(externalColumnSchema.isColumnar()); DataType dataType = thriftDataTyopeToWrapperDataType(externalColumnSchema.data_type); http://git-wip-us.apache.org/repos/asf/carbondata/blob/290ef5a3/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java index 98aa6e0..83cb34e 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java @@ -179,7 +179,11 @@ public class CarbonReaderBuilder { if (isTransactionalTable) { table = CarbonTable.buildFromTablePath(tableName, "default", tablePath); } else { - table = CarbonTable.buildDummyTable(tablePath); + if (filterExpression != null) { + table = CarbonTable.buildTable(tablePath, tableName); + } else { + table = CarbonTable.buildDummyTable(tablePath); + } } final CarbonFileInputFormat format = new CarbonFileInputFormat(); final Job job = new Job(new Configuration()); http://git-wip-us.apache.org/repos/asf/carbondata/blob/290ef5a3/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java index a8aa795..fb2e2bc 100644 --- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java +++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java @@ -20,8 +20,7 @@ package org.apache.carbondata.sdk.file; import java.io.*; import java.sql.Date; import java.sql.Timestamp; -import java.util.Arrays; -import java.util.Comparator; +import java.util.*; import org.apache.avro.generic.GenericData; import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException; @@ -29,6 +28,11 @@ import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.scan.expression.ColumnExpression; +import org.apache.carbondata.core.scan.expression.LiteralExpression; +import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression; +import org.apache.carbondata.core.scan.expression.logical.AndExpression; +import org.apache.carbondata.core.scan.expression.logical.OrExpression; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.path.CarbonTablePath; @@ -48,6 +52,12 @@ public class CarbonReaderTest extends TestCase { @After public void verifyDMFile() { assert (!TestUtil.verifyMdtFile()); + String path = "./testWriteFiles"; + try { + FileUtils.deleteDirectory(new File(path)); + } catch (IOException e) { + e.printStackTrace(); + } } @Test @@ -106,6 +116,243 @@ public class CarbonReaderTest extends TestCase { } @Test + public void testReadWithFilterOfTransactional() throws IOException, InterruptedException { + String path = "./testWriteFiles"; + FileUtils.deleteDirectory(new File(path)); + + Field[] fields = new Field[2]; + fields[0] = new Field("name", DataTypes.STRING); + fields[1] = new Field("age", DataTypes.INT); + + TestUtil.writeFilesAndVerify(200, new Schema(fields), path, true); + + EqualToExpression equalToExpression = new EqualToExpression( + new ColumnExpression("name", DataTypes.STRING), + new LiteralExpression("robot1", DataTypes.STRING)); + CarbonReader reader = CarbonReader + .builder(path, "_temp") + .isTransactionalTable(true) + .projection(new String[]{"name", "age"}) + .filter(equalToExpression) + .build(); + + int i = 0; + while (reader.hasNext()) { + Object[] row = (Object[]) reader.readNextRow(); + // Default sort column is applied for dimensions. So, need to validate accordingly + assert ("robot1".equals(row[0])); + i++; + } + Assert.assertEquals(i, 20); + + reader.close(); + + FileUtils.deleteDirectory(new File(path)); + } + + @Test + public void testReadWithFilterOfTransactionalAnd() throws IOException, InterruptedException { + String path = "./testWriteFiles"; + FileUtils.deleteDirectory(new File(path)); + + Field[] fields = new Field[3]; + fields[0] = new Field("name", DataTypes.STRING); + fields[1] = new Field("age", DataTypes.INT); + fields[2] = new Field("doubleField", DataTypes.DOUBLE); + + TestUtil.writeFilesAndVerify(200, new Schema(fields), path, true); + + ColumnExpression columnExpression = new ColumnExpression("doubleField", DataTypes.DOUBLE); + EqualToExpression equalToExpression = new EqualToExpression(columnExpression, + new LiteralExpression("3.5", DataTypes.DOUBLE)); + + ColumnExpression columnExpression2 = new ColumnExpression("name", DataTypes.STRING); + EqualToExpression equalToExpression2 = new EqualToExpression(columnExpression2, + new LiteralExpression("robot7", DataTypes.STRING)); + + AndExpression andExpression = new AndExpression(equalToExpression, equalToExpression2); + CarbonReader reader = CarbonReader + .builder(path, "_temp") + .isTransactionalTable(true) + .projection(new String[]{"name", "age", "doubleField"}) + .filter(andExpression) + .build(); + + int i = 0; + while (reader.hasNext()) { + Object[] row = (Object[]) reader.readNextRow(); + assert (((String) row[0]).contains("robot7")); + assert (7 == (int) (row[1])); + assert (3.5 == (double) (row[2])); + i++; + } + Assert.assertEquals(i, 1); + + reader.close(); + + FileUtils.deleteDirectory(new File(path)); + } + + @Test + public void testReadWithFilterOfNonTransactionalSimple() throws IOException, InterruptedException { + String path = "./testWriteFiles"; + FileUtils.deleteDirectory(new File(path)); + + Field[] fields = new Field[2]; + fields[0] = new Field("name", DataTypes.STRING); + fields[1] = new Field("age", DataTypes.INT); + + TestUtil.writeFilesAndVerify(200, new Schema(fields), path, false, false); + + ColumnExpression columnExpression = new ColumnExpression("name", DataTypes.STRING); + EqualToExpression equalToExpression = new EqualToExpression(columnExpression, + new LiteralExpression("robot1", DataTypes.STRING)); + + CarbonReader reader = CarbonReader + .builder(path, "_temp") + .isTransactionalTable(false) + .projection(new String[]{"name", "age"}) + .filter(equalToExpression) + .build(); + + int i = 0; + while (reader.hasNext()) { + Object[] row = (Object[]) reader.readNextRow(); + // Default sort column is applied for dimensions. So, need to validate accordingly + assert ("robot1".equals(row[0])); + i++; + } + Assert.assertEquals(i, 20); + + reader.close(); + + FileUtils.deleteDirectory(new File(path)); + } + + @Test + public void testReadWithFilterOfNonTransactional2() throws IOException, InterruptedException { + String path = "./testWriteFiles"; + FileUtils.deleteDirectory(new File(path)); + + Field[] fields = new Field[2]; + fields[0] = new Field("name", DataTypes.STRING); + fields[1] = new Field("age", DataTypes.INT); + + TestUtil.writeFilesAndVerify(200, new Schema(fields), path, false, false); + + ColumnExpression columnExpression = new ColumnExpression("age", DataTypes.INT); + + EqualToExpression equalToExpression = new EqualToExpression(columnExpression, + new LiteralExpression("1", DataTypes.INT)); + CarbonReader reader = CarbonReader + .builder(path, "_temp") + .isTransactionalTable(false) + .projection(new String[]{"name", "age"}) + .filter(equalToExpression) + .build(); + + int i = 0; + while (reader.hasNext()) { + Object[] row = (Object[]) reader.readNextRow(); + // Default sort column is applied for dimensions. So, need to validate accordingly + assert (((String) row[0]).contains("robot")); + assert (1 == (int) (row[1])); + i++; + } + Assert.assertEquals(i, 1); + + reader.close(); + + FileUtils.deleteDirectory(new File(path)); + } + + @Test + public void testReadWithFilterOfNonTransactionalAnd() throws IOException, InterruptedException { + String path = "./testWriteFiles"; + FileUtils.deleteDirectory(new File(path)); + + Field[] fields = new Field[3]; + fields[0] = new Field("name", DataTypes.STRING); + fields[1] = new Field("age", DataTypes.INT); + fields[2] = new Field("doubleField", DataTypes.DOUBLE); + + TestUtil.writeFilesAndVerify(200, new Schema(fields), path, false, false); + + ColumnExpression columnExpression = new ColumnExpression("doubleField", DataTypes.DOUBLE); + EqualToExpression equalToExpression = new EqualToExpression(columnExpression, + new LiteralExpression("3.5", DataTypes.DOUBLE)); + + ColumnExpression columnExpression2 = new ColumnExpression("name", DataTypes.STRING); + EqualToExpression equalToExpression2 = new EqualToExpression(columnExpression2, + new LiteralExpression("robot7", DataTypes.STRING)); + + AndExpression andExpression = new AndExpression(equalToExpression, equalToExpression2); + CarbonReader reader = CarbonReader + .builder(path, "_temp") + .isTransactionalTable(false) + .projection(new String[]{"name", "age", "doubleField"}) + .filter(andExpression) + .build(); + + int i = 0; + while (reader.hasNext()) { + Object[] row = (Object[]) reader.readNextRow(); + assert (((String) row[0]).contains("robot7")); + assert (7 == (int) (row[1])); + assert (3.5 == (double) (row[2])); + i++; + } + Assert.assertEquals(i, 1); + + reader.close(); + + FileUtils.deleteDirectory(new File(path)); + } + + @Test + public void testReadWithFilterOfNonTransactionalOr() throws IOException, InterruptedException { + String path = "./testWriteFiles"; + FileUtils.deleteDirectory(new File(path)); + + Field[] fields = new Field[3]; + fields[0] = new Field("name", DataTypes.STRING); + fields[1] = new Field("age", DataTypes.INT); + fields[2] = new Field("doubleField", DataTypes.DOUBLE); + + TestUtil.writeFilesAndVerify(200, new Schema(fields), path, false, false); + + ColumnExpression columnExpression = new ColumnExpression("doubleField", DataTypes.DOUBLE); + EqualToExpression equalToExpression = new EqualToExpression(columnExpression, + new LiteralExpression("3.5", DataTypes.DOUBLE)); + + ColumnExpression columnExpression2 = new ColumnExpression("name", DataTypes.STRING); + EqualToExpression equalToExpression2 = new EqualToExpression(columnExpression2, + new LiteralExpression("robot7", DataTypes.STRING)); + + OrExpression andExpression = new OrExpression(equalToExpression, equalToExpression2); + CarbonReader reader = CarbonReader + .builder(path, "_temp") + .isTransactionalTable(false) + .projection(new String[]{"name", "age", "doubleField"}) + .filter(andExpression) + .build(); + + int i = 0; + while (reader.hasNext()) { + Object[] row = (Object[]) reader.readNextRow(); + assert (((String) row[0]).contains("robot7")); + assert (7 == ((int) (row[1]) % 10)); + assert (0.5 == ((double) (row[2]) % 1)); + i++; + } + Assert.assertEquals(i, 20); + + reader.close(); + + FileUtils.deleteDirectory(new File(path)); + } + + @Test public void testReadColumnTwice() throws IOException, InterruptedException { String path = "./testWriteFiles"; FileUtils.deleteDirectory(new File(path)); http://git-wip-us.apache.org/repos/asf/carbondata/blob/290ef5a3/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java index 0f00d61..919472c 100644 --- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java +++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java @@ -49,6 +49,20 @@ public class TestUtil { } /** + * write file and verify + * + * @param rows number of rows + * @param schema schema + * @param path table store path + * @param persistSchema whether persist schema + * @param isTransactionalTable whether is transactional table + */ + public static void writeFilesAndVerify(int rows, Schema schema, String path, boolean persistSchema, + boolean isTransactionalTable) { + writeFilesAndVerify(rows, schema, path, null, persistSchema, -1, -1, isTransactionalTable); + } + + /** * Invoke CarbonWriter API to write carbon files and assert the file is rewritten * @param rows number of rows to write * @param schema schema of the file