Repository: carbondata Updated Branches: refs/heads/master a03335759 -> 4b8dc0a58
[CARBONDATA-2423][SDK]SDK Reader support to read from Non Transactional Table This closes #2257 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/4b8dc0a5 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/4b8dc0a5 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/4b8dc0a5 Branch: refs/heads/master Commit: 4b8dc0a58aba542211b3908aa31456444e6bfe04 Parents: a033357 Author: sounakr <[email protected]> Authored: Wed May 2 08:56:09 2018 +0530 Committer: ravipesala <[email protected]> Committed: Wed May 2 20:08:27 2018 +0530 ---------------------------------------------------------------------- .../core/datastore/SegmentTaskIndexStore.java | 2 +- .../core/metadata/schema/table/CarbonTable.java | 15 ++++++-- .../hadoop/api/CarbonFileInputFormat.java | 34 ++++++++++++++--- .../sdk/file/CarbonReaderBuilder.java | 9 ++++- .../sdk/file/CSVCarbonWriterTest.java | 7 +--- .../carbondata/sdk/file/CarbonReaderTest.java | 39 ++++++++++++++++++++ .../apache/carbondata/sdk/file/TestUtil.java | 24 +++++++++--- 7 files changed, 108 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b8dc0a5/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java index d9e544f..537c635 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java @@ -90,7 +90,7 @@ public class SegmentTaskIndexStore segmentTaskIndexWrapper = loadAndGetTaskIdToSegmentsMap( tableSegmentUniqueIdentifier.getSegmentToTableBlocksInfos(), - CarbonTable.buildFromTablePath("name", "path"), + CarbonTable.buildFromTablePath("name", "path", false), tableSegmentUniqueIdentifier); } catch (IndexBuilderException e) { throw new IOException(e.getMessage(), e); http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b8dc0a5/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java index 9ae5ed4..1875237 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java @@ -244,10 +244,17 @@ public class CarbonTable implements Serializable { return buildFromTableInfo(tableInfo); } - public static CarbonTable buildFromTablePath( - String tableName, String tablePath) throws IOException { - return SchemaReader.readCarbonTableFromStore( - AbsoluteTableIdentifier.from(tablePath, tableName, "default")); + public static CarbonTable buildFromTablePath(String tableName, String tablePath, + boolean isTransactionalTable) throws IOException { + if (isTransactionalTable) { + return SchemaReader + .readCarbonTableFromStore(AbsoluteTableIdentifier.from(tablePath, "default", tableName)); + } else { + // Infer the schema from the Carbondata file. + TableInfo tableInfoInfer = + SchemaReader.inferSchema(AbsoluteTableIdentifier.from(tablePath, "null", "null"), false); + return CarbonTable.buildFromTableInfo(tableInfoInfer); + } } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b8dc0a5/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java index 3dac3bb..2af147d 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java @@ -42,6 +42,7 @@ import org.apache.carbondata.core.scan.expression.Expression; import org.apache.carbondata.core.scan.filter.SingleTableProvider; import org.apache.carbondata.core.scan.filter.TableProvider; import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; +import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.path.CarbonTablePath; @@ -116,8 +117,13 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se // get all valid segments and set them into the configuration // check for externalTable segment (Segment_null) // process and resolve the expression - ReadCommittedScope readCommittedScope = new LatestFilesReadCommittedScope( - identifier.getTablePath() + "/Fact/Part0/Segment_null/"); + ReadCommittedScope readCommittedScope = null; + if (carbonTable.isTransactionalTable()) { + readCommittedScope = new LatestFilesReadCommittedScope( + identifier.getTablePath() + "/Fact/Part0/Segment_null/"); + } else { + readCommittedScope = new LatestFilesReadCommittedScope(identifier.getTablePath()); + } Expression filter = getFilterPredicates(job.getConfiguration()); TableProvider tableProvider = new SingleTableProvider(carbonTable); // this will be null in case of corrupt schema file. @@ -126,13 +132,31 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se FilterResolverIntf filterInterface = carbonTable.resolveFilter(filter, tableProvider); - String segmentDir = CarbonTablePath.getSegmentPath(identifier.getTablePath(), "null"); + String segmentDir = null; + if (carbonTable.isTransactionalTable()) { + segmentDir = CarbonTablePath.getSegmentPath(identifier.getTablePath(), "null"); + } else { + segmentDir = identifier.getTablePath(); + } FileFactory.FileType fileType = FileFactory.getFileType(segmentDir); if (FileFactory.isFileExist(segmentDir, fileType)) { // if external table Segments are found, add it to the List List<Segment> externalTableSegments = new ArrayList<Segment>(); - Segment seg = new Segment("null", null, readCommittedScope); - externalTableSegments.add(seg); + Segment seg; + if (carbonTable.isTransactionalTable()) { + // SDK some cases write into the Segment Path instead of Table Path i.e. inside + // the "Fact/Part0/Segment_null". The segment in this case is named as "null". + // The table is denoted by default as a transactional table and goes through + // the path of CarbonFileInputFormat. The above scenario is handled in the below code. + seg = new Segment("null", null, readCommittedScope); + externalTableSegments.add(seg); + } else { + LoadMetadataDetails[] loadMetadataDetails = readCommittedScope.getSegmentList(); + for (LoadMetadataDetails load : loadMetadataDetails) { + seg = new Segment(load.getLoadName(), null, readCommittedScope); + externalTableSegments.add(seg); + } + } Map<String, String> indexFiles = new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir); http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b8dc0a5/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java index 9560ef7..d15e548 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonReaderBuilder.java @@ -48,6 +48,7 @@ public class CarbonReaderBuilder { private String[] projectionColumns; private Expression filterExpression; private String tableName; + private boolean isTransactionalTable = true; CarbonReaderBuilder(String tablePath, String tableName) { this.tablePath = tablePath; @@ -60,6 +61,12 @@ public class CarbonReaderBuilder { return this; } + public CarbonReaderBuilder isTransactionalTable(boolean isTransactionalTable) { + Objects.requireNonNull(isTransactionalTable); + this.isTransactionalTable = isTransactionalTable; + return this; + } + public CarbonReaderBuilder filter(Expression fileterExpression) { Objects.requireNonNull(fileterExpression); this.filterExpression = fileterExpression; @@ -134,7 +141,7 @@ public class CarbonReaderBuilder { } public <T> CarbonReader<T> build() throws IOException, InterruptedException { - CarbonTable table = CarbonTable.buildFromTablePath(tableName, tablePath); + CarbonTable table = CarbonTable.buildFromTablePath(tableName, tablePath, isTransactionalTable); final CarbonFileInputFormat format = new CarbonFileInputFormat(); final Job job = new Job(new Configuration()); http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b8dc0a5/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java index c4dcee9..ba3d3ac 100644 --- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java +++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java @@ -19,14 +19,11 @@ package org.apache.carbondata.sdk.file; import java.io.File; import java.io.FileFilter; -import java.io.FilenameFilter; import java.io.IOException; -import org.apache.carbondata.common.Strings; import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.metadata.datatype.DataTypes; -import org.apache.carbondata.core.scan.expression.logical.TrueExpression; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.commons.io.FileUtils; @@ -137,7 +134,7 @@ public class CSVCarbonWriterTest { fields[0] = new Field("name", DataTypes.STRING); fields[1] = new Field("age", DataTypes.INT); - TestUtil.writeFilesAndVerify(1000 * 1000, new Schema(fields), path, null, false, 1, 100); + TestUtil.writeFilesAndVerify(1000 * 1000, new Schema(fields), path, null, false, 1, 100, false); // TODO: implement reader to verify the number of blocklet in the file @@ -153,7 +150,7 @@ public class CSVCarbonWriterTest { fields[0] = new Field("name", DataTypes.STRING); fields[1] = new Field("age", DataTypes.INT); - TestUtil.writeFilesAndVerify(1000 * 1000, new Schema(fields), path, null, false, 2, 2); + TestUtil.writeFilesAndVerify(1000 * 1000, new Schema(fields), path, null, false, 2, 2, true); File segmentFolder = new File(CarbonTablePath.getSegmentPath(path, "null")); File[] dataFiles = segmentFolder.listFiles(new FileFilter() { http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b8dc0a5/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java index bb1a7c6..f026499 100644 --- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java +++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CarbonReaderTest.java @@ -125,4 +125,43 @@ public class CarbonReaderTest { FileUtils.deleteDirectory(new File(path)); } + + + @Test + public void testWriteAndReadFilesNonTransactional() throws IOException, InterruptedException { + String path = "./testWriteFiles"; + FileUtils.deleteDirectory(new File(path)); + + Field[] fields = new Field[2]; + fields[0] = new Field("name", DataTypes.STRING); + fields[1] = new Field("age", DataTypes.INT); + + // Write to a Non Transactional Table + TestUtil.writeFilesAndVerify(new Schema(fields), path, true, false); + + CarbonReader reader = CarbonReader.builder(path, "_temp") + .projection(new String[]{"name", "age"}) + .isTransactionalTable(false) + .build(); + + // expected output after sorting + String[] name = new String[100]; + int[] age = new int[100]; + for (int i = 0; i < 100; i++) { + name[i] = "robot" + (i / 10); + age[i] = (i % 10) * 10 + i / 10; + } + + int i = 0; + while (reader.hasNext()) { + Object[] row = (Object[]) reader.readNextRow(); + // Default sort column is applied for dimensions. So, need to validate accordingly + Assert.assertEquals(name[i], row[0]); + Assert.assertEquals(age[i], row[1]); + i++; + } + Assert.assertEquals(i, 100); + + FileUtils.deleteDirectory(new File(path)); + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b8dc0a5/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java index 03aecb8..6870f36 100644 --- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java +++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java @@ -34,11 +34,16 @@ public class TestUtil { } static void writeFilesAndVerify(Schema schema, String path, String[] sortColumns) { - writeFilesAndVerify(100, schema, path, sortColumns, false, -1, -1); + writeFilesAndVerify(100, schema, path, sortColumns, false, -1, -1, true); } public static void writeFilesAndVerify(Schema schema, String path, boolean persistSchema) { - writeFilesAndVerify(100, schema, path, null, persistSchema, -1, -1); + writeFilesAndVerify(100, schema, path, null, persistSchema, -1, -1, true); + } + + public static void writeFilesAndVerify(Schema schema, String path, boolean persistSchema, + boolean isTransactionalTable) { + writeFilesAndVerify(100, schema, path, null, persistSchema, -1, -1, isTransactionalTable); } /** @@ -50,13 +55,14 @@ public class TestUtil { * @param persistSchema true if want to persist schema file * @param blockletSize blockletSize in the file, -1 for default size * @param blockSize blockSize in the file, -1 for default size + * @param isTransactionalTable set to true if this is written for Transactional Table. */ static void writeFilesAndVerify(int rows, Schema schema, String path, String[] sortColumns, - boolean persistSchema, int blockletSize, int blockSize) { + boolean persistSchema, int blockletSize, int blockSize, boolean isTransactionalTable) { try { CarbonWriterBuilder builder = CarbonWriter.builder() .withSchema(schema) - .isTransactionalTable(true) + .isTransactionalTable(isTransactionalTable) .outputPath(path); if (sortColumns != null) { builder = builder.sortBy(sortColumns); @@ -85,8 +91,14 @@ public class TestUtil { Assert.fail(l.getMessage()); } - File segmentFolder = new File(CarbonTablePath.getSegmentPath(path, "null")); - Assert.assertTrue(segmentFolder.exists()); + File segmentFolder = null; + if (isTransactionalTable) { + segmentFolder = new File(CarbonTablePath.getSegmentPath(path, "null")); + Assert.assertTrue(segmentFolder.exists()); + } else { + segmentFolder = new File(path); + Assert.assertTrue(segmentFolder.exists()); + } File[] dataFiles = segmentFolder.listFiles(new FileFilter() { @Override public boolean accept(File pathname) {
