Github user QiangCai commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1638#discussion_r156834294
--- Diff:
streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
---
@@ -180,6 +182,70 @@ public static String close(CarbonTable table, String
segmentId)
}
}
+ /**
+ * change the status of the segment from "streaming" to "streaming
finish"
+ */
+ public static void finishStreaming(CarbonTable carbonTable) throws
Exception {
+ ICarbonLock lock = CarbonLockFactory.getCarbonLockObj(
+ carbonTable.getTableInfo().getOrCreateAbsoluteTableIdentifier(),
+ LockUsage.TABLE_STATUS_LOCK);
+ try {
+ if (lock.lockWithRetries()) {
+ ICarbonLock streamingLock = CarbonLockFactory.getCarbonLockObj(
+
carbonTable.getTableInfo().getOrCreateAbsoluteTableIdentifier(),
+ LockUsage.STREAMING_LOCK);
+ try {
+ if (streamingLock.lockWithRetries()) {
+ LoadMetadataDetails[] details =
+
SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath());
+ boolean updated = false;
+ for (LoadMetadataDetails detail : details) {
+ if (SegmentStatus.STREAMING == detail.getSegmentStatus()) {
+ detail.setLoadEndTime(System.currentTimeMillis());
+ detail.setSegmentStatus(SegmentStatus.STREAMING_FINISH);
+ updated = true;
+ }
+ }
+ if (updated) {
+ CarbonTablePath tablePath =
+
CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier());
+ SegmentStatusManager.writeLoadDetailsIntoFile(
+ tablePath.getTableStatusFilePath(), details);
+ }
+ } else {
+ String msg = "Failed to finish streaming, because streaming is
locked for table " +
+ carbonTable.getDatabaseName() + "." +
carbonTable.getTableName();
+ LOGGER.error(msg);
+ throw new Exception(msg);
+ }
+ } finally {
+ if (streamingLock.unlock()) {
+ LOGGER.info("Table unlocked successfully after streaming
finished" + carbonTable
+ .getDatabaseName() + "." + carbonTable.getTableName());
+ } else {
+ LOGGER.error("Unable to unlock Table lock for table " +
+ carbonTable.getDatabaseName() + "." +
carbonTable.getTableName() +
+ " during streaming finished");
+ }
+ }
+ } else {
+ String msg = "Failed to acquire table status lock of " +
+ carbonTable.getDatabaseName() + "." +
carbonTable.getTableName();
+ LOGGER.error(msg);
+ throw new Exception(msg);
--- End diff --
fixed
---