[
https://issues.apache.org/jira/browse/CARBONDATA-1783?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Chetan Bhat updated CARBONDATA-1783:
------------------------------------
Description:
Steps :-
Spark submit thrift server is started using the command - bin/spark-submit
--master yarn-client --executor-memory 10G --executor-cores 5 --driver-memory
5G --num-executors 3 --class
org.apache.carbondata.spark.thriftserver.CarbonThriftServer
/srv/spark2.2Bigdata/install/spark/sparkJdbc/carbonlib/carbondata_2.11-1.3.0-SNAPSHOT-shade-hadoop2.7.2.jar
"hdfs://hacluster/user/hive/warehouse/carbon.store"
Spark shell is launched using the command - bin/spark-shell --master
yarn-client --executor-memory 10G --executor-cores 5 --driver-memory 5G
--num-executors 3 --jars
/srv/spark2.2Bigdata/install/spark/sparkJdbc/carbonlib/carbondata_2.11-1.3.0-SNAPSHOT-shade-hadoop2.7.2.jar
>From Spark shell user creates table and loads data in the table as shown below.
import java.io.{File, PrintWriter}
import java.net.ServerSocket
import org.apache.spark.sql.{CarbonEnv, SparkSession}
import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
"yyyy/MM/dd")
import org.apache.spark.sql.CarbonSession._
val carbonSession = SparkSession.
builder().
appName("StreamExample").
getOrCreateCarbonSession("hdfs://hacluster/user/hive/warehouse/carbon.store")
carbonSession.sparkContext.setLogLevel("INFO")
def sql(sql: String) = carbonSession.sql(sql)
def writeSocket(serverSocket: ServerSocket): Thread = {
val thread = new Thread() {
override def run(): Unit = {
// wait for client to connection request and accept
val clientSocket = serverSocket.accept()
val socketWriter = new PrintWriter(clientSocket.getOutputStream())
var index = 0
for (_ <- 1 to 1000) {
// write 5 records per iteration
for (_ <- 0 to 100) {
index = index + 1
socketWriter.println(index.toString + ",name_" + index
+ ",city_" + index + "," + (index *
10000.00).toString +
",school_" + index + ":school_" + index + index
+ "$" + index)
}
socketWriter.flush()
Thread.sleep(2000)
}
socketWriter.close()
System.out.println("Socket closed")
}
}
thread.start()
thread
}
def startStreaming(spark: SparkSession, tablePath: CarbonTablePath, tableName:
String, port: Int): Thread = {
val thread = new Thread() {
override def run(): Unit = {
var qry: StreamingQuery = null
try {
val readSocketDF = spark.readStream
.format("socket")
.option("host", "10.18.98.34")
.option("port", port)
.load()
qry = readSocketDF.writeStream
.format("carbondata")
.trigger(ProcessingTime("5 seconds"))
.option("checkpointLocation", tablePath.getStreamingCheckpointDir)
.option("tablePath", tablePath.getPath).option("tableName",
tableName)
.start()
qry.awaitTermination()
} catch {
case ex: Throwable =>
ex.printStackTrace()
println("Done reading and writing streaming data")
} finally {
qry.stop()
}
}
}
thread.start()
thread
}
val streamTableName = "all_datatypes_2048"
sql(s"create table all_datatypes_2048 (imei string,deviceInformationId int,MAC
string,deviceColor string,device_backColor string,modelId string,marketName
string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series
string,productionDate timestamp,bomCode string,internalModels string,
deliveryTime string, channelsId string, channelsName string , deliveryAreaId
string, deliveryCountry string, deliveryProvince string, deliveryCity
string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string,
ActiveCheckTime string, ActiveAreaId string, ActiveCountry string,
ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet
string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion
string, Active_operaSysVersion string, Active_BacVerNumber string,
Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer
string,Active_webTypeDataVerNumber string, Active_operatorsVersion string,
Active_phonePADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int,
Latest_DAY Decimal(30,10), Latest_HOUR string, Latest_areaId string,
Latest_country string, Latest_province string, Latest_city string,
Latest_district string, Latest_street string, Latest_releaseId string,
Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber
string, Latest_BacFlashVer string, Latest_webUIVersion string,
Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string,
Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string,
Latest_operatorId string, gamePointDescription string,gamePointId
double,contractNumber BigInt) STORED BY 'org.apache.carbondata.format'
TBLPROPERTIES('streaming'='true','table_blocksize'='2048')")
sql(s"LOAD DATA INPATH 'hdfs://hacluster/chetan/100_olap_C20.csv' INTO table
all_datatypes_2048 options ('DELIMITER'=',',
'BAD_RECORDS_ACTION'='FORCE','FILEHEADER'='imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription')")
val carbonTable = CarbonEnv.getInstance(carbonSession).carbonMetastore.
lookupRelation(Some("default"),
streamTableName)(carbonSession).asInstanceOf[CarbonRelation].carbonTable
val tablePath =
CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
val port = 8007
val serverSocket = new ServerSocket(port)
val socketThread = writeSocket(serverSocket)
val streamingThread = startStreaming(carbonSession, tablePath, streamTableName,
port)
While the streaming load is in progress from Beeline user executes the below
select filter query
select imei,gamePointId, channelsId,series from all_datatypes_2048 where
channelsId >=10 OR channelsId <=1 and series='7Series';
*Issue : The select filter query fails with exception as shown below.*
0: jdbc:hive2://10.18.98.34:23040> select imei,gamePointId, channelsId,series
from all_datatypes_2048 where channelsId >=10 OR channelsId <=1 and
series='7Series';
Error: org.apache.spark.SparkException: Job aborted due to stage failure: Task
6 in stage 773.0 failed 4 times, most recent failure: Lost task 6.3 in stage
773.0 (TID 33727, BLR1000014269, executor 14): java.io.IOException: Failed to
filter row in vector reader
at
org.apache.carbondata.hadoop.streaming.CarbonStreamRecordReader.scanBlockletAndFillVector(CarbonStreamRecordReader.java:423)
at
org.apache.carbondata.hadoop.streaming.CarbonStreamRecordReader.nextColumnarBatch(CarbonStreamRecordReader.java:317)
at
org.apache.carbondata.hadoop.streaming.CarbonStreamRecordReader.nextKeyValue(CarbonStreamRecordReader.java:298)
at
org.apache.carbondata.spark.rdd.CarbonScanRDD$$anon$1.hasNext(CarbonScanRDD.scala:298)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by:
org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException:
[B cannot be cast to org.apache.spark.unsafe.types.UTF8String
at
org.apache.spark.sql.SparkUnknownExpression.evaluate(SparkUnknownExpression.scala:50)
at
org.apache.carbondata.core.scan.expression.conditional.GreaterThanEqualToExpression.evaluate(GreaterThanEqualToExpression.java:38)
at
org.apache.carbondata.core.scan.filter.executer.RowLevelFilterExecuterImpl.applyFilter(RowLevelFilterExecuterImpl.java:272)
at
org.apache.carbondata.core.scan.filter.executer.OrFilterExecuterImpl.applyFilter(OrFilterExecuterImpl.java:49)
at
org.apache.carbondata.hadoop.streaming.CarbonStreamRecordReader.scanBlockletAndFillVector(CarbonStreamRecordReader.java:418)
... 20 more
Driver stacktrace: (state=,code=0)
Expected : The select filter query should be success without error/exception.
was:
Steps :-
Spark submit thrift server is started using the command - bin/spark-submit
--master yarn-client --executor-memory 10G --executor-cores 5 --driver-memory
5G --num-executors 3 --class
org.apache.carbondata.spark.thriftserver.CarbonThriftServer
/srv/spark2.2Bigdata/install/spark/sparkJdbc/carbonlib/carbondata_2.11-1.3.0-SNAPSHOT-shade-hadoop2.7.2.jar
"hdfs://hacluster/user/hive/warehouse/carbon.store"
Spark shell is launched using the command - bin/spark-shell --master
yarn-client --executor-memory 10G --executor-cores 5 --driver-memory 5G
--num-executors 3 --jars
/srv/spark2.2Bigdata/install/spark/sparkJdbc/carbonlib/carbondata_2.11-1.3.0-SNAPSHOT-shade-hadoop2.7.2.jar
>From Spark shell user creates table and loads data in the table as shown below.
import java.io.{File, PrintWriter}
import java.net.ServerSocket
import org.apache.spark.sql.{CarbonEnv, SparkSession}
import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
"yyyy/MM/dd")
import org.apache.spark.sql.CarbonSession._
val carbonSession = SparkSession.
builder().
appName("StreamExample").
getOrCreateCarbonSession("hdfs://hacluster/user/hive/warehouse/carbon.store")
carbonSession.sparkContext.setLogLevel("INFO")
def sql(sql: String) = carbonSession.sql(sql)
def writeSocket(serverSocket: ServerSocket): Thread = {
val thread = new Thread() {
override def run(): Unit = {
// wait for client to connection request and accept
val clientSocket = serverSocket.accept()
val socketWriter = new PrintWriter(clientSocket.getOutputStream())
var index = 0
for (_ <- 1 to 1000) {
// write 5 records per iteration
for (_ <- 0 to 100) {
index = index + 1
socketWriter.println(index.toString + ",name_" + index
+ ",city_" + index + "," + (index *
10000.00).toString +
",school_" + index + ":school_" + index + index
+ "$" + index)
}
socketWriter.flush()
Thread.sleep(2000)
}
socketWriter.close()
System.out.println("Socket closed")
}
}
thread.start()
thread
}
def startStreaming(spark: SparkSession, tablePath: CarbonTablePath, tableName:
String, port: Int): Thread = {
val thread = new Thread() {
override def run(): Unit = {
var qry: StreamingQuery = null
try {
val readSocketDF = spark.readStream
.format("socket")
.option("host", "10.18.98.34")
.option("port", port)
.load()
qry = readSocketDF.writeStream
.format("carbondata")
.trigger(ProcessingTime("5 seconds"))
.option("checkpointLocation", tablePath.getStreamingCheckpointDir)
.option("tablePath", tablePath.getPath).option("tableName",
tableName)
.start()
qry.awaitTermination()
} catch {
case ex: Throwable =>
ex.printStackTrace()
println("Done reading and writing streaming data")
} finally {
qry.stop()
}
}
}
thread.start()
thread
}
val streamTableName = "all_datatypes_2048"
sql(s"create table all_datatypes_2048 (imei string,deviceInformationId int,MAC
string,deviceColor string,device_backColor string,modelId string,marketName
string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series
string,productionDate timestamp,bomCode string,internalModels string,
deliveryTime string, channelsId string, channelsName string , deliveryAreaId
string, deliveryCountry string, deliveryProvince string, deliveryCity
string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string,
ActiveCheckTime string, ActiveAreaId string, ActiveCountry string,
ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet
string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion
string, Active_operaSysVersion string, Active_BacVerNumber string,
Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer
string,Active_webTypeDataVerNumber string, Active_operatorsVersion string,
Active_phonePADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int,
Latest_DAY Decimal(30,10), Latest_HOUR string, Latest_areaId string,
Latest_country string, Latest_province string, Latest_city string,
Latest_district string, Latest_street string, Latest_releaseId string,
Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber
string, Latest_BacFlashVer string, Latest_webUIVersion string,
Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string,
Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string,
Latest_operatorId string, gamePointDescription string,gamePointId
double,contractNumber BigInt) STORED BY 'org.apache.carbondata.format'
TBLPROPERTIES('streaming'='true','table_blocksize'='2048')")
sql(s"LOAD DATA INPATH 'hdfs://hacluster/chetan/100_olap_C20.csv' INTO table
all_datatypes_2048 options ('DELIMITER'=',',
'BAD_RECORDS_ACTION'='FORCE','FILEHEADER'='imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription')")
val carbonTable = CarbonEnv.getInstance(carbonSession).carbonMetastore.
lookupRelation(Some("default"),
streamTableName)(carbonSession).asInstanceOf[CarbonRelation].carbonTable
val tablePath =
CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
val port = 8007
val serverSocket = new ServerSocket(port)
val socketThread = writeSocket(serverSocket)
val streamingThread = startStreaming(carbonSession, tablePath, streamTableName,
port)
While the streaming load is in progress from Beeline user executes the below
select filter query
select imei,gamePointId, channelsId,series from all_datatypes_2048 where
channelsId >=10 OR channelsId <=1 and series='7Series';
Issue : The select filter query fails with exception as shown below.
0: jdbc:hive2://10.18.98.34:23040> select imei,gamePointId, channelsId,series
from all_datatypes_2048 where channelsId >=10 OR channelsId <=1 and
series='7Series';
Error: org.apache.spark.SparkException: Job aborted due to stage failure: Task
6 in stage 773.0 failed 4 times, most recent failure: Lost task 6.3 in stage
773.0 (TID 33727, BLR1000014269, executor 14): java.io.IOException: Failed to
filter row in vector reader
at
org.apache.carbondata.hadoop.streaming.CarbonStreamRecordReader.scanBlockletAndFillVector(CarbonStreamRecordReader.java:423)
at
org.apache.carbondata.hadoop.streaming.CarbonStreamRecordReader.nextColumnarBatch(CarbonStreamRecordReader.java:317)
at
org.apache.carbondata.hadoop.streaming.CarbonStreamRecordReader.nextKeyValue(CarbonStreamRecordReader.java:298)
at
org.apache.carbondata.spark.rdd.CarbonScanRDD$$anon$1.hasNext(CarbonScanRDD.scala:298)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by:
org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException:
[B cannot be cast to org.apache.spark.unsafe.types.UTF8String
at
org.apache.spark.sql.SparkUnknownExpression.evaluate(SparkUnknownExpression.scala:50)
at
org.apache.carbondata.core.scan.expression.conditional.GreaterThanEqualToExpression.evaluate(GreaterThanEqualToExpression.java:38)
at
org.apache.carbondata.core.scan.filter.executer.RowLevelFilterExecuterImpl.applyFilter(RowLevelFilterExecuterImpl.java:272)
at
org.apache.carbondata.core.scan.filter.executer.OrFilterExecuterImpl.applyFilter(OrFilterExecuterImpl.java:49)
at
org.apache.carbondata.hadoop.streaming.CarbonStreamRecordReader.scanBlockletAndFillVector(CarbonStreamRecordReader.java:418)
... 20 more
Driver stacktrace: (state=,code=0)
Expected : The select filter query should be success without error/exception.
> (Carbon1.3.0 - Streaming) Error "Failed to filter row in vector reader" when
> filter query executed on streaming data
> --------------------------------------------------------------------------------------------------------------------
>
> Key: CARBONDATA-1783
> URL: https://issues.apache.org/jira/browse/CARBONDATA-1783
> Project: CarbonData
> Issue Type: Bug
> Components: data-query
> Affects Versions: 1.3.0
> Environment: 3 node ant cluster
> Reporter: Chetan Bhat
> Labels: DFX
>
> Steps :-
> Spark submit thrift server is started using the command - bin/spark-submit
> --master yarn-client --executor-memory 10G --executor-cores 5 --driver-memory
> 5G --num-executors 3 --class
> org.apache.carbondata.spark.thriftserver.CarbonThriftServer
> /srv/spark2.2Bigdata/install/spark/sparkJdbc/carbonlib/carbondata_2.11-1.3.0-SNAPSHOT-shade-hadoop2.7.2.jar
> "hdfs://hacluster/user/hive/warehouse/carbon.store"
> Spark shell is launched using the command - bin/spark-shell --master
> yarn-client --executor-memory 10G --executor-cores 5 --driver-memory 5G
> --num-executors 3 --jars
> /srv/spark2.2Bigdata/install/spark/sparkJdbc/carbonlib/carbondata_2.11-1.3.0-SNAPSHOT-shade-hadoop2.7.2.jar
> From Spark shell user creates table and loads data in the table as shown
> below.
> import java.io.{File, PrintWriter}
> import java.net.ServerSocket
> import org.apache.spark.sql.{CarbonEnv, SparkSession}
> import org.apache.spark.sql.hive.CarbonRelation
> import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
> import org.apache.carbondata.core.constants.CarbonCommonConstants
> import org.apache.carbondata.core.util.CarbonProperties
> import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
> CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
> "yyyy/MM/dd")
> import org.apache.spark.sql.CarbonSession._
> val carbonSession = SparkSession.
> builder().
> appName("StreamExample").
>
> getOrCreateCarbonSession("hdfs://hacluster/user/hive/warehouse/carbon.store")
>
> carbonSession.sparkContext.setLogLevel("INFO")
> def sql(sql: String) = carbonSession.sql(sql)
> def writeSocket(serverSocket: ServerSocket): Thread = {
> val thread = new Thread() {
> override def run(): Unit = {
> // wait for client to connection request and accept
> val clientSocket = serverSocket.accept()
> val socketWriter = new PrintWriter(clientSocket.getOutputStream())
> var index = 0
> for (_ <- 1 to 1000) {
> // write 5 records per iteration
> for (_ <- 0 to 100) {
> index = index + 1
> socketWriter.println(index.toString + ",name_" + index
> + ",city_" + index + "," + (index *
> 10000.00).toString +
> ",school_" + index + ":school_" + index +
> index + "$" + index)
> }
> socketWriter.flush()
> Thread.sleep(2000)
> }
> socketWriter.close()
> System.out.println("Socket closed")
> }
> }
> thread.start()
> thread
> }
>
> def startStreaming(spark: SparkSession, tablePath: CarbonTablePath,
> tableName: String, port: Int): Thread = {
> val thread = new Thread() {
> override def run(): Unit = {
> var qry: StreamingQuery = null
> try {
> val readSocketDF = spark.readStream
> .format("socket")
> .option("host", "10.18.98.34")
> .option("port", port)
> .load()
> qry = readSocketDF.writeStream
> .format("carbondata")
> .trigger(ProcessingTime("5 seconds"))
> .option("checkpointLocation", tablePath.getStreamingCheckpointDir)
> .option("tablePath", tablePath.getPath).option("tableName",
> tableName)
> .start()
> qry.awaitTermination()
> } catch {
> case ex: Throwable =>
> ex.printStackTrace()
> println("Done reading and writing streaming data")
> } finally {
> qry.stop()
> }
> }
> }
> thread.start()
> thread
> }
> val streamTableName = "all_datatypes_2048"
> sql(s"create table all_datatypes_2048 (imei string,deviceInformationId
> int,MAC string,deviceColor string,device_backColor string,modelId
> string,marketName string,AMSize string,ROMSize string,CUPAudit
> string,CPIClocked string,series string,productionDate timestamp,bomCode
> string,internalModels string, deliveryTime string, channelsId string,
> channelsName string , deliveryAreaId string, deliveryCountry string,
> deliveryProvince string, deliveryCity string,deliveryDistrict string,
> deliveryStreet string, oxSingleNumber string, ActiveCheckTime string,
> ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity
> string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string,
> Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion
> string, Active_BacVerNumber string, Active_BacFlashVer string,
> Active_webUIVersion string, Active_webUITypeCarrVer
> string,Active_webTypeDataVerNumber string, Active_operatorsVersion string,
> Active_phonePADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int,
> Latest_DAY Decimal(30,10), Latest_HOUR string, Latest_areaId string,
> Latest_country string, Latest_province string, Latest_city string,
> Latest_district string, Latest_street string, Latest_releaseId string,
> Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber
> string, Latest_BacFlashVer string, Latest_webUIVersion string,
> Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string,
> Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string,
> Latest_operatorId string, gamePointDescription string,gamePointId
> double,contractNumber BigInt) STORED BY 'org.apache.carbondata.format'
> TBLPROPERTIES('streaming'='true','table_blocksize'='2048')")
> sql(s"LOAD DATA INPATH 'hdfs://hacluster/chetan/100_olap_C20.csv' INTO table
> all_datatypes_2048 options ('DELIMITER'=',',
> 'BAD_RECORDS_ACTION'='FORCE','FILEHEADER'='imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription')")
> val carbonTable = CarbonEnv.getInstance(carbonSession).carbonMetastore.
> lookupRelation(Some("default"),
> streamTableName)(carbonSession).asInstanceOf[CarbonRelation].carbonTable
> val tablePath =
> CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
> val port = 8007
> val serverSocket = new ServerSocket(port)
> val socketThread = writeSocket(serverSocket)
> val streamingThread = startStreaming(carbonSession, tablePath,
> streamTableName, port)
> While the streaming load is in progress from Beeline user executes the below
> select filter query
> select imei,gamePointId, channelsId,series from all_datatypes_2048 where
> channelsId >=10 OR channelsId <=1 and series='7Series';
> *Issue : The select filter query fails with exception as shown below.*
> 0: jdbc:hive2://10.18.98.34:23040> select imei,gamePointId, channelsId,series
> from all_datatypes_2048 where channelsId >=10 OR channelsId <=1 and
> series='7Series';
> Error: org.apache.spark.SparkException: Job aborted due to stage failure:
> Task 6 in stage 773.0 failed 4 times, most recent failure: Lost task 6.3 in
> stage 773.0 (TID 33727, BLR1000014269, executor 14): java.io.IOException:
> Failed to filter row in vector reader
> at
> org.apache.carbondata.hadoop.streaming.CarbonStreamRecordReader.scanBlockletAndFillVector(CarbonStreamRecordReader.java:423)
> at
> org.apache.carbondata.hadoop.streaming.CarbonStreamRecordReader.nextColumnarBatch(CarbonStreamRecordReader.java:317)
> at
> org.apache.carbondata.hadoop.streaming.CarbonStreamRecordReader.nextKeyValue(CarbonStreamRecordReader.java:298)
> at
> org.apache.carbondata.spark.rdd.CarbonScanRDD$$anon$1.hasNext(CarbonScanRDD.scala:298)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown
> Source)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
> Source)
> at
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
> at org.apache.spark.scheduler.Task.run(Task.scala:99)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by:
> org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException:
> [B cannot be cast to org.apache.spark.unsafe.types.UTF8String
> at
> org.apache.spark.sql.SparkUnknownExpression.evaluate(SparkUnknownExpression.scala:50)
> at
> org.apache.carbondata.core.scan.expression.conditional.GreaterThanEqualToExpression.evaluate(GreaterThanEqualToExpression.java:38)
> at
> org.apache.carbondata.core.scan.filter.executer.RowLevelFilterExecuterImpl.applyFilter(RowLevelFilterExecuterImpl.java:272)
> at
> org.apache.carbondata.core.scan.filter.executer.OrFilterExecuterImpl.applyFilter(OrFilterExecuterImpl.java:49)
> at
> org.apache.carbondata.hadoop.streaming.CarbonStreamRecordReader.scanBlockletAndFillVector(CarbonStreamRecordReader.java:418)
> ... 20 more
> Driver stacktrace: (state=,code=0)
> Expected : The select filter query should be success without error/exception.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)