Geetika Gupta created CARBONDATA-2003:
-----------------------------------------
Summary: Streaming table is not updated on second streaming load
Key: CARBONDATA-2003
URL: https://issues.apache.org/jira/browse/CARBONDATA-2003
Project: CarbonData
Issue Type: Bug
Components: data-load
Affects Versions: 1.3.0
Environment: spark2.1
Reporter: Geetika Gupta
Fix For: 1.3.0
Attachments: 2000_UniqData.csv
I tried the following scenario on spark shell:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.CarbonSession._
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
val carbon = SparkSession.builder().config(sc.getConf)
.getOrCreateCarbonSession("hdfs://localhost:54311/newCarbonStore","/tmp")
carbon.sql("CREATE TABLE uniqdata_stream_8(CUST_ID int,CUST_NAME
String,ACTIVE_EMUI_VERSION string, DOB timestamp, DOJ timestamp, BIGINT_COLUMN1
bigint,BIGINT_COLUMN2 bigint,DECIMAL_COLUMN1 decimal(30,10), DECIMAL_COLUMN2
decimal(36,10),Double_COLUMN1 double, Double_COLUMN2 double,INTEGER_COLUMN1
int) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES ('TABLE_BLOCKSIZE'=
'256 MB', 'streaming'='true')")
import carbon.sqlContext.implicits._
val uniqdataSch = StructType(
Array(StructField("CUST_ID", IntegerType),StructField("CUST_NAME",
StringType),StructField("ACTIVE_EMUI_VERSION", StringType),StructField("DOB",
TimestampType), StructField("DOJ", TimestampType),
StructField("BIGINT_COLUMN1", LongType), StructField("BIGINT_COLUMN2",
LongType), StructField("DECIMAL_COLUMN1",
org.apache.spark.sql.types.DecimalType(30, 10)), StructField("DECIMAL_COLUMN2",
org.apache.spark.sql.types.DecimalType(36,10)), StructField("Double_COLUMN1",
DoubleType), StructField("Double_COLUMN2", DoubleType),
StructField("INTEGER_COLUMN1", IntegerType)))
val streamDf = carbon.readStream
.schema(uniqdataSch)
.option("sep", ",")
.csv("file:///home/geetika/Downloads/uniqdata")
val dfToWrite = streamDf.map{x => x.get(0) + "," + x.get(1) + "," + x.get(2)+
"," + x.get(3)+ "," + x.get(4)+ "," + x.get(5)+ "," + x.get(6)+ "," + x.get(7)+
"," + x.get(8)+ "," + x.get(9)+ "," + x.get(10)+ "," + x.get(11)}
val qry = dfToWrite.writeStream.format("carbondata").trigger(ProcessingTime("5
seconds"))
.option("checkpointLocation","/stream/uniq8")
.option("dbName", "default")
.option("tableName", "uniqdata_stream_8")
.start()
qry.awaitTermination()
Now close this shell and check the record count on the table using :
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.CarbonSession._
val carbon = SparkSession.builder().config(sc.getConf)
.getOrCreateCarbonSession("hdfs://localhost:54311/newCarbonStore","/tmp")
carbon.sql("select count(*) from uniqdata_stream_8").show
OUTPUT:
scala> carbon.sql("select count(*) from uniqdata_stream_8").show
18/01/08 15:51:53 ERROR CarbonProperties: Executor task launch worker-0
Configured value for property carbon.number.of.cores.while.loading is wrong.
Falling back to the default value 2
+--------+
|count(1)|
+--------+
| 2013|
+--------+
Again try the above scenario and check the count. It remains same after the
second streaming load.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)