Chetan Bhat created CARBONDATA-1782:
---------------------------------------
Summary: (Carbon1.3.0 - Streaming) Select regexp_extract from
table with where clause having is null throws indexoutofbounds exception
Key: CARBONDATA-1782
URL: https://issues.apache.org/jira/browse/CARBONDATA-1782
Project: CarbonData
Issue Type: Bug
Components: data-query
Affects Versions: 1.3.0
Environment: 3 node ant cluster
Reporter: Chetan Bhat
Steps :
Thrift server is started using the command - bin/spark-submit --master
yarn-client --executor-memory 10G --executor-cores 5 --driver-memory 5G
--num-executors 3 --class
org.apache.carbondata.spark.thriftserver.CarbonThriftServer
/srv/spark2.2Bigdata/install/spark/sparkJdbc/carbonlib/carbondata_2.11-1.3.0-SNAPSHOT-shade-hadoop2.7.2.jar
"hdfs://hacluster/user/sparkhive/warehouse"
Spark shell is launched using the command - bin/spark-shell --master
yarn-client --executor-memory 10G --executor-cores 5 --driver-memory 5G
--num-executors 3 --jars
/srv/spark2.2Bigdata/install/spark/sparkJdbc/carbonlib/carbondata_2.11-1.3.0-SNAPSHOT-shade-hadoop2.7.2.jar
>From Spark shell the streaming table is created and data is loaded to the
>streaming table.
import java.io.{File, PrintWriter}
import java.net.ServerSocket
import org.apache.spark.sql.{CarbonEnv, SparkSession}
import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
"yyyy/MM/dd")
import org.apache.spark.sql.CarbonSession._
val carbonSession = SparkSession.
builder().
appName("StreamExample").
getOrCreateCarbonSession("hdfs://hacluster/user/hive/warehouse/carbon.store")
carbonSession.sparkContext.setLogLevel("INFO")
def sql(sql: String) = carbonSession.sql(sql)
def writeSocket(serverSocket: ServerSocket): Thread = {
val thread = new Thread() {
override def run(): Unit = {
// wait for client to connection request and accept
val clientSocket = serverSocket.accept()
val socketWriter = new PrintWriter(clientSocket.getOutputStream())
var index = 0
for (_ <- 1 to 1000) {
// write 5 records per iteration
for (_ <- 0 to 100) {
index = index + 1
socketWriter.println(index.toString + ",name_" + index
+ ",city_" + index + "," + (index *
10000.00).toString +
",school_" + index + ":school_" + index + index
+ "$" + index)
}
socketWriter.flush()
Thread.sleep(2000)
}
socketWriter.close()
System.out.println("Socket closed")
}
}
thread.start()
thread
}
def startStreaming(spark: SparkSession, tablePath: CarbonTablePath, tableName:
String, port: Int): Thread = {
val thread = new Thread() {
override def run(): Unit = {
var qry: StreamingQuery = null
try {
val readSocketDF = spark.readStream
.format("socket")
.option("host", "10.18.98.34")
.option("port", port)
.load()
qry = readSocketDF.writeStream
.format("carbondata")
.trigger(ProcessingTime("5 seconds"))
.option("checkpointLocation", tablePath.getStreamingCheckpointDir)
.option("tablePath", tablePath.getPath).option("tableName",
tableName)
.start()
qry.awaitTermination()
} catch {
case ex: Throwable =>
ex.printStackTrace()
println("Done reading and writing streaming data")
} finally {
qry.stop()
}
}
}
thread.start()
thread
}
val streamTableName = "uniqdata"
sql(s"CREATE TABLE uniqdata (CUST_ID int,CUST_NAME String,ACTIVE_EMUI_VERSION
string, DOB timestamp, DOJ timestamp, BIGINT_COLUMN1 bigint,BIGINT_COLUMN2
bigint,DECIMAL_COLUMN1 decimal(30,10), DECIMAL_COLUMN2
decimal(36,36),Double_COLUMN1 double, Double_COLUMN2 double,INTEGER_COLUMN1
int) STORED BY 'org.apache.carbondata.format'
TBLPROPERTIES('streaming'='true')")
sql(s"LOAD DATA INPATH 'hdfs://hacluster/chetan/2000_UniqData.csv' into table
uniqdata OPTIONS(
'BAD_RECORDS_ACTION'='FORCE','FILEHEADER'='CUST_ID,CUST_NAME,ACTIVE_EMUI_VERSION,DOB,DOJ,BIGINT_COLUMN1,BIGINT_COLUMN2,DECIMAL_COLUMN1,DECIMAL_COLUMN2,Double_COLUMN1,Double_COLUMN2,INTEGER_COLUMN1')")
val carbonTable = CarbonEnv.getInstance(carbonSession).carbonMetastore.
lookupRelation(Some("default"),
streamTableName)(carbonSession).asInstanceOf[CarbonRelation].carbonTable
val tablePath =
CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
val port = 8006
val serverSocket = new ServerSocket(port)
val socketThread = writeSocket(serverSocket)
val streamingThread = startStreaming(carbonSession, tablePath, streamTableName,
port)
>From Beeline user executes the query
select regexp_extract(CUST_NAME,'a',1)from uniqdata where
regexp_extract(CUST_NAME,'a',1) IS NULL or regexp_extract(DOB,'b',2) is NULL;
Issue : Select regexp_extract from table with where clause having is null
throws indexoutofbounds exception
0: jdbc:hive2://10.18.98.34:23040> select regexp_extract(CUST_NAME,'a',1)from
uniqdata where regexp_extract(CUST_NAME,'a',1) IS NULL or
regexp_extract(DOB,'b',2) is NULL;
Error: org.apache.spark.SparkException: Job aborted due to stage failure: Task
1 in stage 198.0 failed 4 times, most recent failure: Lost task 1.3 in stage
198.0 (TID 1634, BLR1000014269, executor 8):
java.lang.IndexOutOfBoundsException: No group 1
at java.util.regex.Matcher.group(Matcher.java:538)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace: (state=,code=0)
Expected : Select regexp_extract from table with where clause having is null
should be successful.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)