Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1566#discussion_r153093767
--- Diff:
streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffUtil.scala
---
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.streaming
+
+import org.apache.spark.sql.SQLContext
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails,
SegmentStatus,
+SegmentStatusManager}
+import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.spark.HandoffResultImpl
+import org.apache.carbondata.spark.util.CommonUtil
+
+object StreamHandoffUtil {
+
+ private val LOGGER =
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+ /**
+ * start new thread to execute stream segment handoff
+ */
+ def startStreamingHandoffThread(
+ carbonLoadModel: CarbonLoadModel,
+ sqlContext: SQLContext,
+ storeLocation: String
+ ): Unit = {
+ // start a new thread to execute streaming segment handoff
+ val handoffThread = new Thread() {
+ override def run(): Unit = {
+ val carbonTable =
carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+ val identifier = carbonTable.getAbsoluteTableIdentifier
+ val tablePath = CarbonStorePath.getCarbonTablePath(identifier)
+ var continueHandoff = false
+ // require handoff lock on table
+ val lock = CarbonLockFactory.getCarbonLockObj(identifier,
LockUsage.HANDOFF_LOCK)
+ try {
+ if (lock.lockWithRetries()) {
+ LOGGER.info("Acquired the handoff lock for table" +
+ s" ${ carbonTable.getDatabaseName }.${
carbonTable.getTableName }")
+ // handoff streaming segment one by one
+ do {
+ val segmentStatusManager = new
SegmentStatusManager(identifier)
+ var loadMetadataDetails: Array[LoadMetadataDetails] = null
+ val statusLock = segmentStatusManager.getTableStatusLock
+ try {
+ if (statusLock.lockWithRetries()) {
--- End diff --
add comment to describe what this lock is for
---