http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java ---------------------------------------------------------------------- diff --git a/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java b/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java index cd1e28a..d30891a 100644 --- a/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java +++ b/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java @@ -63,30 +63,6 @@ public class BlockIndexStoreTest extends TestCase { } -// public void testLoadAndGetTaskIdToSegmentsMapForSingleSegment() -// throws IOException { -// File file = getPartFile(); -// TableBlockInfo info = -// new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { "loclhost" }, -// file.length(), ColumnarFormatVersion.V1, null); -// CarbonTableIdentifier carbonTableIdentifier = -// new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "t3", "1"); -// AbsoluteTableIdentifier absoluteTableIdentifier = -// new AbsoluteTableIdentifier("/src/test/resources", carbonTableIdentifier); -// try { -// -// List<TableBlockUniqueIdentifier> tableBlockInfoList = -// getTableBlockUniqueIdentifierList(Arrays.asList(new TableBlockInfo[] { info }), absoluteTableIdentifier); -// List<AbstractIndex> loadAndGetBlocks = cache.getAll(tableBlockInfoList); -// assertTrue(loadAndGetBlocks.size() == 1); -// } catch (Exception e) { -// assertTrue(false); -// } -// List<String> segmentIds = new ArrayList<>(); -// segmentIds.add(info.getSegment()); -// cache.removeTableBlocks(segmentIds, absoluteTableIdentifier); -// } -// private List<TableBlockUniqueIdentifier> getTableBlockUniqueIdentifierList(List<TableBlockInfo> tableBlockInfos, AbsoluteTableIdentifier absoluteTableIdentifier) { List<TableBlockUniqueIdentifier> tableBlockUniqueIdentifiers = new ArrayList<>(); @@ -95,138 +71,6 @@ public class BlockIndexStoreTest extends TestCase { } return tableBlockUniqueIdentifiers; } -// -// public void testloadAndGetTaskIdToSegmentsMapForSameBlockLoadedConcurrently() -// throws IOException { -// String canonicalPath = -// new File(this.getClass().getResource("/").getPath() + "/../../").getCanonicalPath(); -// File file = getPartFile(); -// TableBlockInfo info = -// new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { "loclhost" }, -// file.length(), ColumnarFormatVersion.V1, null); -// TableBlockInfo info1 = -// new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { "loclhost" }, -// file.length(), ColumnarFormatVersion.V1, null); -// -// TableBlockInfo info2 = -// new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { "loclhost" }, -// file.length(), ColumnarFormatVersion.V1, null); -// TableBlockInfo info3 = -// new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { "loclhost" }, -// file.length(), ColumnarFormatVersion.V1, null); -// TableBlockInfo info4 = -// new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { "loclhost" }, -// file.length(), ColumnarFormatVersion.V1, null); -// -// CarbonTableIdentifier carbonTableIdentifier = -// new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "t3", "1"); -// AbsoluteTableIdentifier absoluteTableIdentifier = -// new AbsoluteTableIdentifier("/src/test/resources", carbonTableIdentifier); -// ExecutorService executor = Executors.newFixedThreadPool(3); -// executor.submit(new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info, info1 }), -// absoluteTableIdentifier)); -// executor.submit( -// new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info2, info3, info4 }), -// absoluteTableIdentifier)); -// executor.submit(new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info, info1 }), -// absoluteTableIdentifier)); -// executor.submit( -// new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info2, info3, info4 }), -// absoluteTableIdentifier)); -// executor.shutdown(); -// try { -// executor.awaitTermination(1, TimeUnit.DAYS); -// } catch (InterruptedException e) { -// e.printStackTrace(); -// } -// List<TableBlockInfo> tableBlockInfos = -// Arrays.asList(new TableBlockInfo[] { info, info1, info2, info3, info4 }); -// try { -// List<TableBlockUniqueIdentifier> tableBlockUniqueIdentifiers = -// getTableBlockUniqueIdentifierList(tableBlockInfos, absoluteTableIdentifier); -// List<AbstractIndex> loadAndGetBlocks = cache.getAll(tableBlockUniqueIdentifiers); -// assertTrue(loadAndGetBlocks.size() == 5); -// } catch (Exception e) { -// assertTrue(false); -// } -// List<String> segmentIds = new ArrayList<>(); -// for (TableBlockInfo tableBlockInfo : tableBlockInfos) { -// segmentIds.add(tableBlockInfo.getSegment()); -// } -// cache.removeTableBlocks(segmentIds, absoluteTableIdentifier); -// } -// -// public void testloadAndGetTaskIdToSegmentsMapForDifferentSegmentLoadedConcurrently() -// throws IOException { -// String canonicalPath = -// new File(this.getClass().getResource("/").getPath() + "/../../").getCanonicalPath(); -// File file = getPartFile(); -// TableBlockInfo info = -// new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { "loclhost" }, -// file.length(), ColumnarFormatVersion.V3, null); -// TableBlockInfo info1 = -// new TableBlockInfo(file.getAbsolutePath(), 0, "0", new String[] { "loclhost" }, -// file.length(), ColumnarFormatVersion.V3, null); -// -// TableBlockInfo info2 = -// new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { "loclhost" }, -// file.length(), ColumnarFormatVersion.V3, null); -// TableBlockInfo info3 = -// new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { "loclhost" }, -// file.length(), ColumnarFormatVersion.V3, null); -// TableBlockInfo info4 = -// new TableBlockInfo(file.getAbsolutePath(), 0, "1", new String[] { "loclhost" }, -// file.length(), ColumnarFormatVersion.V3, null); -// -// TableBlockInfo info5 = -// new TableBlockInfo(file.getAbsolutePath(), 0, "2", new String[] { "loclhost" }, -// file.length(),ColumnarFormatVersion.V3, null); -// TableBlockInfo info6 = -// new TableBlockInfo(file.getAbsolutePath(), 0, "2", new String[] { "loclhost" }, -// file.length(), ColumnarFormatVersion.V3, null); -// -// TableBlockInfo info7 = -// new TableBlockInfo(file.getAbsolutePath(), 0, "3", new String[] { "loclhost" }, -// file.length(), ColumnarFormatVersion.V3, null); -// -// CarbonTableIdentifier carbonTableIdentifier = -// new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "t3", "1"); -// AbsoluteTableIdentifier absoluteTableIdentifier = -// new AbsoluteTableIdentifier("/src/test/resources", carbonTableIdentifier); -// ExecutorService executor = Executors.newFixedThreadPool(3); -// executor.submit(new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info, info1 }), -// absoluteTableIdentifier)); -// executor.submit( -// new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info2, info3, info4 }), -// absoluteTableIdentifier)); -// executor.submit(new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info5, info6 }), -// absoluteTableIdentifier)); -// executor.submit(new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info7 }), -// absoluteTableIdentifier)); -// -// executor.shutdown(); -// try { -// executor.awaitTermination(1, TimeUnit.DAYS); -// } catch (InterruptedException e) { -// // TODO Auto-generated catch block -// e.printStackTrace(); -// } -// List<TableBlockInfo> tableBlockInfos = Arrays -// .asList(new TableBlockInfo[] { info, info1, info2, info3, info4, info5, info6, info7 }); -// try { -// List<TableBlockUniqueIdentifier> blockUniqueIdentifierList = -// getTableBlockUniqueIdentifierList(tableBlockInfos, absoluteTableIdentifier); -// List<AbstractIndex> loadAndGetBlocks = cache.getAll(blockUniqueIdentifierList); -// assertTrue(loadAndGetBlocks.size() == 8); -// } catch (Exception e) { -// assertTrue(false); -// } -// List<String> segmentIds = new ArrayList<>(); -// for (TableBlockInfo tableBlockInfo : tableBlockInfos) { -// segmentIds.add(tableBlockInfo.getSegment()); -// } -// cache.removeTableBlocks(segmentIds, absoluteTableIdentifier); -// } private class BlockLoaderThread implements Callable<Void> { private List<TableBlockInfo> tableBlockInfoList; @@ -248,7 +92,7 @@ public class BlockIndexStoreTest extends TestCase { } private static File getPartFile() { - String path = StoreCreator.getAbsoluteTableIdentifier().getTablePath() + String path = StoreCreator.getIdentifier().getTablePath() + "/Fact/Part0/Segment_0"; File file = new File(path); File[] files = file.listFiles();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java ---------------------------------------------------------------------- diff --git a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java index 12b00db..f594585 100644 --- a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java +++ b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java @@ -64,7 +64,6 @@ import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; import org.apache.carbondata.core.statusmanager.SegmentStatus; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.core.util.path.CarbonStorePath; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.core.writer.CarbonDictionaryWriter; import org.apache.carbondata.core.writer.CarbonDictionaryWriterImpl; @@ -98,14 +97,14 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; */ public class StoreCreator { - private static AbsoluteTableIdentifier absoluteTableIdentifier; + private static AbsoluteTableIdentifier identifier; private static String storePath = ""; static { try { storePath = new File("target/store").getCanonicalPath(); String dbName = "testdb"; String tableName = "testtable"; - absoluteTableIdentifier = + identifier = AbsoluteTableIdentifier.from( storePath + "/testdb/testtable", new CarbonTableIdentifier(dbName, tableName, UUID.randomUUID().toString())); @@ -114,8 +113,8 @@ public class StoreCreator { } } - public static AbsoluteTableIdentifier getAbsoluteTableIdentifier() { - return absoluteTableIdentifier; + public static AbsoluteTableIdentifier getIdentifier() { + return identifier; } /** @@ -134,12 +133,12 @@ public class StoreCreator { CarbonDataLoadSchema schema = new CarbonDataLoadSchema(table); CarbonLoadModel loadModel = new CarbonLoadModel(); loadModel.setCarbonDataLoadSchema(schema); - loadModel.setDatabaseName(absoluteTableIdentifier.getCarbonTableIdentifier().getDatabaseName()); - loadModel.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName()); - loadModel.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName()); + loadModel.setDatabaseName(identifier.getCarbonTableIdentifier().getDatabaseName()); + loadModel.setTableName(identifier.getCarbonTableIdentifier().getTableName()); + loadModel.setTableName(identifier.getCarbonTableIdentifier().getTableName()); loadModel.setFactFilePath(factFilePath); loadModel.setLoadMetadataDetails(new ArrayList<LoadMetadataDetails>()); - loadModel.setTablePath(absoluteTableIdentifier.getTablePath()); + loadModel.setTablePath(identifier.getTablePath()); loadModel.setDateFormat(null); loadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty( CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, @@ -175,9 +174,9 @@ public class StoreCreator { private static CarbonTable createTable() throws IOException { TableInfo tableInfo = new TableInfo(); - tableInfo.setDatabaseName(absoluteTableIdentifier.getCarbonTableIdentifier().getDatabaseName()); + tableInfo.setDatabaseName(identifier.getCarbonTableIdentifier().getDatabaseName()); TableSchema tableSchema = new TableSchema(); - tableSchema.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName()); + tableSchema.setTableName(identifier.getCarbonTableIdentifier().getTableName()); List<ColumnSchema> columnSchemas = new ArrayList<ColumnSchema>(); ArrayList<Encoding> encodings = new ArrayList<>(); encodings.add(Encoding.DICTIONARY); @@ -257,16 +256,13 @@ public class StoreCreator { tableSchema.setSchemaEvalution(schemaEvol); tableSchema.setTableId(UUID.randomUUID().toString()); tableInfo.setTableUniqueName( - absoluteTableIdentifier.getCarbonTableIdentifier().getTableUniqueName() + identifier.getCarbonTableIdentifier().getTableUniqueName() ); tableInfo.setLastUpdatedTime(System.currentTimeMillis()); tableInfo.setFactTable(tableSchema); - tableInfo.setTablePath(absoluteTableIdentifier.getTablePath()); + tableInfo.setTablePath(identifier.getTablePath()); - CarbonTablePath carbonTablePath = CarbonStorePath - .getCarbonTablePath(absoluteTableIdentifier.getTablePath(), - absoluteTableIdentifier.getCarbonTableIdentifier()); - String schemaFilePath = carbonTablePath.getSchemaFilePath(); + String schemaFilePath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath()); String schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath); CarbonMetadata.getInstance().loadTableMetadata(tableInfo); @@ -329,7 +325,7 @@ public class StoreCreator { writer.close(); writer.commit(); Dictionary dict = (Dictionary) dictCache.get( - new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier, + new DictionaryColumnUniqueIdentifier(identifier, columnIdentifier, dims.get(i).getDataType())); CarbonDictionarySortInfoPreparator preparator = new CarbonDictionarySortInfoPreparator(); @@ -444,7 +440,7 @@ public class StoreCreator { loadMetadataDetails.setLoadStartTime(loadMetadataDetails.getTimeStamp(readCurrentTime())); listOfLoadFolderDetails.add(loadMetadataDetails); - String dataLoadLocation = schema.getCarbonTable().getMetaDataFilepath() + File.separator + String dataLoadLocation = schema.getCarbonTable().getMetadataPath() + File.separator + CarbonTablePath.TABLE_STATUS_FILE; DataOutputStream dataOutputStream; http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java index 7b823ac..8c9889d 100644 --- a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java +++ b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java @@ -38,7 +38,6 @@ import org.apache.carbondata.core.statusmanager.FileFormat; import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; import org.apache.carbondata.core.statusmanager.SegmentStatus; import org.apache.carbondata.core.statusmanager.SegmentStatusManager; -import org.apache.carbondata.core.util.path.CarbonStorePath; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.core.writer.CarbonIndexFileWriter; import org.apache.carbondata.format.BlockIndex; @@ -60,8 +59,6 @@ public class StreamSegment { * get stream segment or create new stream segment if not exists */ public static String open(CarbonTable table) throws IOException { - CarbonTablePath tablePath = - CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier()); SegmentStatusManager segmentStatusManager = new SegmentStatusManager(table.getAbsoluteTableIdentifier()); ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock(); @@ -72,7 +69,8 @@ public class StreamSegment { + " for stream table get or create segment"); LoadMetadataDetails[] details = - SegmentStatusManager.readLoadMetadata(tablePath.getMetadataDirectoryPath()); + SegmentStatusManager.readLoadMetadata( + CarbonTablePath.getMetadataPath(table.getTablePath())); LoadMetadataDetails streamSegment = null; for (LoadMetadataDetails detail : details) { if (FileFormat.ROW_V1 == detail.getFileFormat()) { @@ -97,8 +95,8 @@ public class StreamSegment { newDetails[i] = details[i]; } newDetails[i] = newDetail; - SegmentStatusManager - .writeLoadDetailsIntoFile(tablePath.getTableStatusFilePath(), newDetails); + SegmentStatusManager.writeLoadDetailsIntoFile( + CarbonTablePath.getTableStatusFilePath(table.getTablePath()), newDetails); return newDetail.getLoadName(); } else { return streamSegment.getLoadName(); @@ -126,8 +124,6 @@ public class StreamSegment { */ public static String close(CarbonTable table, String segmentId) throws IOException { - CarbonTablePath tablePath = - CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier()); SegmentStatusManager segmentStatusManager = new SegmentStatusManager(table.getAbsoluteTableIdentifier()); ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock(); @@ -138,7 +134,8 @@ public class StreamSegment { + " for stream table finish segment"); LoadMetadataDetails[] details = - SegmentStatusManager.readLoadMetadata(tablePath.getMetadataDirectoryPath()); + SegmentStatusManager.readLoadMetadata( + CarbonTablePath.getMetadataPath(table.getTablePath())); for (LoadMetadataDetails detail : details) { if (segmentId.equals(detail.getLoadName())) { detail.setLoadEndTime(System.currentTimeMillis()); @@ -162,7 +159,8 @@ public class StreamSegment { } newDetails[i] = newDetail; SegmentStatusManager - .writeLoadDetailsIntoFile(tablePath.getTableStatusFilePath(), newDetails); + .writeLoadDetailsIntoFile(CarbonTablePath.getTableStatusFilePath( + table.getTablePath()), newDetails); return newDetail.getLoadName(); } else { LOGGER.error( @@ -192,7 +190,7 @@ public class StreamSegment { try { if (statusLock.lockWithRetries()) { LoadMetadataDetails[] details = - SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath()); + SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath()); boolean updated = false; for (LoadMetadataDetails detail : details) { if (SegmentStatus.STREAMING == detail.getSegmentStatus()) { @@ -202,10 +200,8 @@ public class StreamSegment { } } if (updated) { - CarbonTablePath tablePath = - CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier()); SegmentStatusManager.writeLoadDetailsIntoFile( - tablePath.getTableStatusFilePath(), + CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath()), details); } } else { http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala index 578f0cd..aa5ead2 100644 --- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala +++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala @@ -38,7 +38,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.scan.result.iterator.RawResultIterator import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager} import org.apache.carbondata.core.util.CarbonUtil -import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.events.{OperationContext, OperationListenerBus} import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection} import org.apache.carbondata.hadoop.api.CarbonTableInputFormat @@ -223,7 +223,6 @@ object StreamHandoffRDD { ): Unit = { val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable val identifier = carbonTable.getAbsoluteTableIdentifier - val tablePath = CarbonStorePath.getCarbonTablePath(identifier) var continueHandoff = false // require handoff lock on table val lock = CarbonLockFactory.getCarbonLockObj(identifier, LockUsage.HANDOFF_LOCK) @@ -240,7 +239,7 @@ object StreamHandoffRDD { try { if (statusLock.lockWithRetries()) { loadMetadataDetails = SegmentStatusManager.readLoadMetadata( - tablePath.getMetadataDirectoryPath) + CarbonTablePath.getMetadataPath(identifier.getTablePath)) } } finally { if (null != statusLock) { @@ -378,19 +377,16 @@ object StreamHandoffRDD { loadModel: CarbonLoadModel ): Boolean = { var status = false - val metaDataFilepath = - loadModel.getCarbonDataLoadSchema().getCarbonTable().getMetaDataFilepath() - val identifier = - loadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier() - val carbonTablePath = CarbonStorePath.getCarbonTablePath(identifier) - val metadataPath = carbonTablePath.getMetadataDirectoryPath() + val metaDataFilepath = loadModel.getCarbonDataLoadSchema.getCarbonTable.getMetadataPath + val identifier = loadModel.getCarbonDataLoadSchema.getCarbonTable.getAbsoluteTableIdentifier + val metadataPath = CarbonTablePath.getMetadataPath(identifier.getTablePath) val fileType = FileFactory.getFileType(metadataPath) if (!FileFactory.isFileExist(metadataPath, fileType)) { FileFactory.mkdirs(metadataPath, fileType) } - val tableStatusPath = carbonTablePath.getTableStatusFilePath() + val tableStatusPath = CarbonTablePath.getTableStatusFilePath(identifier.getTablePath) val segmentStatusManager = new SegmentStatusManager(identifier) - val carbonLock = segmentStatusManager.getTableStatusLock() + val carbonLock = segmentStatusManager.getTableStatusLock try { if (carbonLock.lockWithRetries()) { LOGGER.info( @@ -424,7 +420,7 @@ object StreamHandoffRDD { status = true } else { LOGGER.error("Not able to acquire the lock for Table status updation for table " + loadModel - .getDatabaseName() + "." + loadModel.getTableName()); + .getDatabaseName() + "." + loadModel.getTableName()) } } finally { if (carbonLock.unlock()) { @@ -435,6 +431,6 @@ object StreamHandoffRDD { "." + loadModel.getTableName() + " during table status updation") } } - return status + status } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala index 75fcfb0..6316d84 100644 --- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala +++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala @@ -31,7 +31,7 @@ import org.apache.carbondata.core.dictionary.service.NonSecureDictionaryServiceP import org.apache.carbondata.core.metadata.encoder.Encoding import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.events.{OperationContext, OperationListenerBus} import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostExecutionEvent, LoadTablePreExecutionEvent} import org.apache.carbondata.processing.loading.model.CarbonLoadModel @@ -127,16 +127,14 @@ object StreamSinkFactory { * @return */ private def getStreamSegmentId(carbonTable: CarbonTable): String = { - val carbonTablePath = CarbonStorePath - .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) - val fileType = FileFactory.getFileType(carbonTablePath.getMetadataDirectoryPath) - if (!FileFactory.isFileExist(carbonTablePath.getMetadataDirectoryPath, fileType)) { + val segmentId = StreamSegment.open(carbonTable) + val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId) + val fileType = FileFactory.getFileType(segmentDir) + if (!FileFactory.isFileExist(segmentDir, fileType)) { // Create table directory path, in case of enabling hive metastore first load may not have // table folder created. - FileFactory.mkdirs(carbonTablePath.getMetadataDirectoryPath, fileType) + FileFactory.mkdirs(segmentDir, fileType) } - val segmentId = StreamSegment.open(carbonTable) - val segmentDir = carbonTablePath.getSegmentDir(segmentId) if (FileFactory.isFileExist(segmentDir, fileType)) { // recover fault StreamSegment.recoverSegmentIfRequired(segmentDir) http://git-wip-us.apache.org/repos/asf/carbondata/blob/f06824e9/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala index 4f1c999..6e6d092 100644 --- a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala +++ b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala @@ -41,7 +41,7 @@ import org.apache.carbondata.core.dictionary.server.DictionaryServer import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.stats.QueryStatistic import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.events.{OperationContext, OperationListenerBus} import org.apache.carbondata.hadoop.streaming.CarbonStreamOutputFormat import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil @@ -64,9 +64,7 @@ class CarbonAppendableStreamSink( carbonLoadModel: CarbonLoadModel, server: Option[DictionaryServer]) extends Sink { - private val carbonTablePath = CarbonStorePath - .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) - private val fileLogPath = carbonTablePath.getStreamingLogDir + private val fileLogPath = CarbonTablePath.getStreamingLogDir(carbonTable.getTablePath) private val fileLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, fileLogPath) // prepare configuration private val hadoopConf = { @@ -166,12 +164,12 @@ class CarbonAppendableStreamSink( * if the directory size of current segment beyond the threshold, hand off new segment */ private def checkOrHandOffSegment(): Unit = { - val segmentDir = carbonTablePath.getSegmentDir(currentSegmentId) + val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, currentSegmentId) val fileType = FileFactory.getFileType(segmentDir) if (segmentMaxSize <= StreamSegment.size(segmentDir)) { val newSegmentId = StreamSegment.close(carbonTable, currentSegmentId) currentSegmentId = newSegmentId - val newSegmentDir = carbonTablePath.getSegmentDir(currentSegmentId) + val newSegmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, currentSegmentId) FileFactory.mkdirs(newSegmentDir, fileType) // TODO trigger hand off operation @@ -270,15 +268,13 @@ object CarbonAppendableStreamSink { } // update data file info in index file - val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) - StreamSegment.updateIndexFile(tablePath.getSegmentDir(segmentId)) + StreamSegment.updateIndexFile( + CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId)) } catch { // catch fault of executor side case t: Throwable => - val tablePath = - CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) - val segmentDir = tablePath.getSegmentDir(segmentId) + val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId) StreamSegment.recoverSegmentIfRequired(segmentDir) LOGGER.error(t, s"Aborting job ${ job.getJobID }.") committer.abortJob(job)