Geetika Gupta created CARBONDATA-2002:
-----------------------------------------

             Summary: Streaming segment status is not getting updated to 
finished or success
                 Key: CARBONDATA-2002
                 URL: https://issues.apache.org/jira/browse/CARBONDATA-2002
             Project: CarbonData
          Issue Type: Bug
          Components: data-load
    Affects Versions: 1.3.0
         Environment: spark2.1
            Reporter: Geetika Gupta
             Fix For: 1.3.0
         Attachments: 2000_UniqData.csv

I created a streaming table and loaded data into it using the following 
commands on spark shell:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.CarbonSession._
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}

val carbon = SparkSession.builder().config(sc.getConf) 
.getOrCreateCarbonSession("hdfs://localhost:54311/newCarbonStore","/tmp")


carbon.sql("drop table if exists uniqdata_part")

carbon.sql("create table uniqdata_stream(CUST_ID int,CUST_NAME String,DOB 
timestamp,DOJ timestamp, BIGINT_COLUMN1 bigint,BIGINT_COLUMN2 
bigint,DECIMAL_COLUMN1 decimal(30,10),DECIMAL_COLUMN2 
decimal(36,10),Double_COLUMN1 double, Double_COLUMN2 double,INTEGER_COLUMN1 
int) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES ('TABLE_BLOCKSIZE'= 
'256 MB', 'streaming'='true')");
import carbon.sqlContext.implicits._

import org.apache.spark.sql.types._
val uniqdataSch = StructType(
Array(StructField("CUST_ID", IntegerType),StructField("CUST_NAME", 
StringType),StructField("DOB", TimestampType), StructField("DOJ", 
TimestampType), StructField("BIGINT_COLUMN1", LongType), 
StructField("BIGINT_COLUMN2", LongType), StructField("DECIMAL_COLUMN1", 
org.apache.spark.sql.types.DecimalType(30, 10)), StructField("DECIMAL_COLUMN2", 
org.apache.spark.sql.types.DecimalType(36,10)), StructField("Double_COLUMN1", 
DoubleType), StructField("Double_COLUMN2", DoubleType), 
StructField("INTEGER_COLUMN1", IntegerType)))

val streamDf = carbon.readStream
.schema(uniqdataSch)
.option("sep", ",")
.csv("file:///home/geetika/Downloads/uniqdata")
val qry = streamDf.writeStream.format("carbondata").trigger(ProcessingTime("5 
seconds"))
             .option("checkpointLocation","/stream/uniq")
            .option("dbName", "default")
            .option("tableName", "uniqdata_stream")
            .start()

          qry.awaitTermination()

//Press ctrl+c to terminate

start the spark shell again

 import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.CarbonSession._
val carbon = SparkSession.builder().config(sc.getConf) 
.getOrCreateCarbonSession("hdfs://localhost:54311/newCarbonStore","/tmp")
carbon.sql("show segments for table uniqdata_stream").show

It shows the following output:
+-----------------+---------+--------------------+-------------+---------+-----------+
|SegmentSequenceId|   Status|     Load Start Time|Load End Time|Merged To|File 
Format|
+-----------------+---------+--------------------+-------------+---------+-----------+
|                0|Streaming|2018-01-05 18:23:...|         null|       NA|     
ROW_V1|
+-----------------+---------+--------------------+-------------+---------+-----------+

Status for the segment is not updated



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to