Github user zzcclp commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1867#discussion_r170958496
--- Diff:
streaming/src/main/scala/org/apache/carbondata/streaming/CarbonStreamSparkStreaming.scala
---
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.streaming
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.execution.streaming.CarbonAppendableStreamSink
+import org.apache.spark.sql.execution.streaming.Sink
+import org.apache.spark.sql.SaveMode
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.streaming.Time
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock,
LockUsage}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+
+class CarbonStreamSparkStreamingWriter {
+
+ private val LOGGER =
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+ private var isInitialize: Boolean = false
+
+ private var lock: ICarbonLock = null
+ private var carbonTable: CarbonTable = null
+ private var configuration: Configuration = null
+ private var carbonAppendableStreamSink: Sink = null
+ private val sparkSession: SparkSession =
SparkSession.builder().getOrCreate()
+
+ def this(carbonTable: CarbonTable, configuration: Configuration) {
+ this()
+ this.carbonTable = carbonTable
+ this.configuration = configuration
+ this.option("dbName", carbonTable.getDatabaseName)
+ this.option("tableName", carbonTable.getTableName)
+ }
+
+ /**
+ * Acquired the lock for stream table
+ */
+ def lockStreamTable(): Unit = {
+ lock =
CarbonLockFactory.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
+ LockUsage.STREAMING_LOCK)
+ if (lock.lockWithRetries()) {
+ LOGGER.info("Acquired the lock for stream table: " +
+ carbonTable.getDatabaseName + "." +
+ carbonTable.getTableName)
+ } else {
+ LOGGER.error("Not able to acquire the lock for stream table:" +
+ carbonTable.getDatabaseName + "." +
carbonTable.getTableName)
+ throw new InterruptedException(
+ "Not able to acquire the lock for stream table: " +
carbonTable.getDatabaseName + "." +
+ carbonTable.getTableName)
+ }
+ }
+
+ /**
+ * unlock for stream table
+ */
+ def unLockStreamTable(): Unit = {
+ if (null != lock) {
+ lock.unlock()
+ LOGGER.info("unlock for stream table: " +
+ carbonTable.getDatabaseName + "." +
+ carbonTable.getTableName)
+ }
+ }
+
+ def initialize(): Unit = {
+ carbonAppendableStreamSink = StreamSinkFactory.createStreamTableSink(
+ sparkSession,
+ configuration,
+ carbonTable,
+ extraOptions.toMap).asInstanceOf[CarbonAppendableStreamSink]
+
+ lockStreamTable()
+
+ isInitialize = true
+ }
+
+ def writeStreamData(dataFrame: DataFrame, time: Time): Unit = {
+ if (!isInitialize) {
+ initialize()
+ }
+ carbonAppendableStreamSink.addBatch(time.milliseconds, dataFrame)
+ }
+
+ private val extraOptions = new scala.collection.mutable.HashMap[String,
String]
+ private var mode: SaveMode = SaveMode.ErrorIfExists
+
+ /**
+ * Specifies the behavior when data or table already exists. Options
include:
+ * - `SaveMode.Overwrite`: overwrite the existing data.
+ * - `SaveMode.Append`: append the data.
+ * - `SaveMode.Ignore`: ignore the operation (i.e. no-op).
+ * - `SaveMode.ErrorIfExists`: default option, throw an exception at
runtime.
+ */
+ def mode(saveMode: SaveMode): CarbonStreamSparkStreamingWriter = {
+ if (mode == SaveMode.ErrorIfExists) {
+ mode = saveMode
+ }
+ this
+ }
+
+ /**
+ * Specifies the behavior when data or table already exists. Options
include:
+ * - `SaveMode.Overwrite`: overwrite the existing data.
--- End diff --
Done
---