[ https://issues.apache.org/jira/browse/SPARK-22841?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sean Owen resolved SPARK-22841. ------------------------------- Resolution: Invalid As the error says, there's no matching group in your regex, so this fails. Because it's a 'programmer error' and not a function of the data, this is the kind of result I'd expect rather than NULL or something. > Select regexp_extract from table with where clause having is null throws > indexoutofbounds exception > --------------------------------------------------------------------------------------------------- > > Key: SPARK-22841 > URL: https://issues.apache.org/jira/browse/SPARK-22841 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.1.0 > Reporter: Chetan Bhat > > Steps : > Thrift server is started using the command - bin/spark-submit --master > yarn-client --executor-memory 10G --executor-cores 5 --driver-memory 5G > --num-executors 3 --class > org.apache.carbondata.spark.thriftserver.CarbonThriftServer > /srv/spark2.2Bigdata/install/spark/sparkJdbc/carbonlib/carbondata_2.11-1.3.0-SNAPSHOT-shade-hadoop2.7.2.jar > "hdfs://hacluster/user/sparkhive/warehouse" > Spark shell is launched using the command - bin/spark-shell --master > yarn-client --executor-memory 10G --executor-cores 5 --driver-memory 5G > --num-executors 3 --jars > /srv/spark2.2Bigdata/install/spark/sparkJdbc/carbonlib/carbondata_2.11-1.3.0-SNAPSHOT-shade-hadoop2.7.2.jar > From Spark shell the streaming table is created and data is loaded to the > streaming table. > import java.io. > {File, PrintWriter} > import java.net.ServerSocket > import org.apache.spark.sql. > {CarbonEnv, SparkSession} > import org.apache.spark.sql.hive.CarbonRelation > import org.apache.spark.sql.streaming. > {ProcessingTime, StreamingQuery} > import org.apache.carbondata.core.constants.CarbonCommonConstants > import org.apache.carbondata.core.util.CarbonProperties > import org.apache.carbondata.core.util.path. > {CarbonStorePath, CarbonTablePath} > CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, > "yyyy/MM/dd") > import org.apache.spark.sql.CarbonSession._ > val carbonSession = SparkSession. > builder(). > appName("StreamExample"). > getOrCreateCarbonSession("hdfs://hacluster/user/hive/warehouse/carbon.store") > carbonSession.sparkContext.setLogLevel("INFO") > def sql(sql: String) = carbonSession.sql(sql) > def writeSocket(serverSocket: ServerSocket): Thread = { > val thread = new Thread() { > override def run(): Unit = { > // wait for client to connection request and accept > val clientSocket = serverSocket.accept() > val socketWriter = new PrintWriter(clientSocket.getOutputStream()) > var index = 0 > for (_ <- 1 to 1000) { > // write 5 records per iteration > for (_ <- 0 to 100) > { index = index + 1 socketWriter.println(index.toString + ",name_" + index + > ",city_" + index + "," + (index * 10000.00).toString + ",school_" + index + > ":school_" + index + index + "$" + index) } > socketWriter.flush() > Thread.sleep(2000) > } > socketWriter.close() > System.out.println("Socket closed") > } > } > thread.start() > thread > } > def startStreaming(spark: SparkSession, tablePath: CarbonTablePath, > tableName: String, port: Int): Thread = { > val thread = new Thread() { > override def run(): Unit = { > var qry: StreamingQuery = null > try > { val readSocketDF = spark.readStream .format("socket") .option("host", > "10.18.98.34") .option("port", port) .load() qry = readSocketDF.writeStream > .format("carbondata") .trigger(ProcessingTime("5 seconds")) > .option("checkpointLocation", tablePath.getStreamingCheckpointDir) > .option("tablePath", tablePath.getPath).option("tableName", tableName) > .start() qry.awaitTermination() } > catch > { case ex: Throwable => ex.printStackTrace() println("Done reading and > writing streaming data") } > finally > { qry.stop() } > } > } > thread.start() > thread > } > val streamTableName = "uniqdata" > sql(s"CREATE TABLE uniqdata (CUST_ID int,CUST_NAME String,ACTIVE_EMUI_VERSION > string, DOB timestamp, DOJ timestamp, BIGINT_COLUMN1 bigint,BIGINT_COLUMN2 > bigint,DECIMAL_COLUMN1 decimal(30,10), DECIMAL_COLUMN2 > decimal(36,36),Double_COLUMN1 double, Double_COLUMN2 double,INTEGER_COLUMN1 > int) STORED BY 'org.apache.carbondata.format' > TBLPROPERTIES('streaming'='true')") > sql(s"LOAD DATA INPATH 'hdfs://hacluster/chetan/2000_UniqData.csv' into table > uniqdata OPTIONS( > 'BAD_RECORDS_ACTION'='FORCE','FILEHEADER'='CUST_ID,CUST_NAME,ACTIVE_EMUI_VERSION,DOB,DOJ,BIGINT_COLUMN1,BIGINT_COLUMN2,DECIMAL_COLUMN1,DECIMAL_COLUMN2,Double_COLUMN1,Double_COLUMN2,INTEGER_COLUMN1')") > val carbonTable = CarbonEnv.getInstance(carbonSession).carbonMetastore. > lookupRelation(Some("default"), > streamTableName)(carbonSession).asInstanceOf[CarbonRelation].carbonTable > val tablePath = > CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) > val port = 8006 > val serverSocket = new ServerSocket(port) > val socketThread = writeSocket(serverSocket) > val streamingThread = startStreaming(carbonSession, tablePath, > streamTableName, port) > From Beeline user executes the query > select regexp_extract(CUST_NAME,'a',1)from uniqdata where > regexp_extract(CUST_NAME,'a',1) IS NULL or regexp_extract(DOB,'b',2) is NULL; > Issue : Select regexp_extract from table with where clause having is null > throws indexoutofbounds exception > 0: jdbc:hive2://10.18.98.34:23040> select regexp_extract(CUST_NAME,'a',1)from > uniqdata where regexp_extract(CUST_NAME,'a',1) IS NULL or > regexp_extract(DOB,'b',2) is NULL; > Error: org.apache.spark.SparkException: Job aborted due to stage failure: > Task 1 in stage 198.0 failed 4 times, most recent failure: Lost task 1.3 in > stage 198.0 (TID 1634, BLR1000014269, executor 8): jagroup > 1va.lang.IndexOutOfBoundsException: No > at java.util.regex.Matcher.group(Matcher.java:538) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:99) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Driver stacktrace: (state=,code=0) > Expected : Select regexp_extract from table with where clause having is null > should be successful. -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org