[ https://issues.apache.org/jira/browse/SPARK-28999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
ruiliang updated SPARK-28999: ----------------------------- Description: Exception in thread "main" org.apache.spark.sql.AnalysisException: Multiple mapGroupsWithStates are not supported on a streaming DataFrames/Datasets;; How do you carry out a double digit operation?The first processing state and the second summarizing the processed state data?Is there any good way not to sink->kafka, kafka source->spark structured like this?thank you {code:java} //代码占位符 package org.roy.demo.streaming.bus import java.sql.Timestamp import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions.current_timestamp import org.apache.spark.sql.streaming._ import streaming.StreamingExamples object StructuredOrderStateListRturn { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) StreamingExamples.setStreamingLogLevels() def main(args: Array[String]): Unit = { val spark = SparkSession .builder.master("local[*]") .appName("StructuredSessionization") .getOrCreate() import spark.implicits._ // Create DataFrame representing the stream of input lines from connection to host:port val lines = spark.readStream .format("socket") .option("host", "10.200.102.192") .option("port", 9998) .load().withColumn("current_timestamp", current_timestamp) //100,1,10,20,2019-09-03 //1001,1,10,200,2019-09-03 //1001,1,10,2000,2019-09-03 val events = lines .as[(String, Timestamp)] .map { case (line, timestamp) => { val orderInfo = line.split(",") if (orderInfo != null && orderInfo.size > 4) { val objEvent = NOEvent(orderInfo(0), orderInfo(1).toInt, orderInfo(2), orderInfo(3).toDouble, orderInfo(4), timestamp) objEvent } else { null } } }.filter(obj => obj != null) /** * -次维护订单状态数据,返回最新的一个订单,并把上一个订单金额附加返回 */ val orderUpdates = events .groupByKey(event => event.orderId) //orderInfoStore=输入的状态类型,orderInfoStoreUpdate=输出的状态类型 .mapGroupsWithState[orderInfoStore, orderInfoStoreUpdate](GroupStateTimeout.ProcessingTimeTimeout) { case (orderId: String, events: Iterator[NOEvent], state: GroupState[orderInfoStore]) => // 如果时间超时,更新缓存 if (state.hasTimedOut) { //时间过了,删除sotre里的数据,把expired=true,返回在线时间等信息 0:00-0:10可以设这个时间,不计算,干掉数据 val finalUpdate = orderInfoStoreUpdate(orderId, state.get.otype, state.get.storeId, state.get.money, 0.0, state.get.orderDate, state.get.timestamp, expired = true) state.remove() finalUpdate } else { //订单没有超时,如果id存在,则替换掉,使用新的订单数据,或作别的操作 var oldOrder = 0.0 //上一笔的金额 val lastEnvent = events.toSeq.last val updatedSession = if (state.exists) { oldOrder = state.get.money //存在,算出旧的金额是多少 orderInfoStore(orderId, lastEnvent.otype, lastEnvent.storeId, lastEnvent.money, oldOrder, lastEnvent.orderDate, lastEnvent.timestamp) } else { orderInfoStore(orderId, lastEnvent.otype, lastEnvent.storeId, lastEnvent.money, 0, lastEnvent.orderDate, lastEnvent.timestamp) } //更新缓存里面的这条数据信息 state.update(updatedSession) // Set timeout such that the session will be expired if no data received for 10 seconds state.setTimeoutDuration("3600 seconds") orderInfoStoreUpdate(orderId, state.get.otype, state.get.storeId, state.get.money, oldOrder, state.get.orderDate, state.get.timestamp, expired = false) } } /** * 二次计算出门店的数据 */ val storeUpdate = orderUpdates.groupByKey(order => order.storeId).mapGroupsWithState[g1InfoStore, g1InfoStoreUpdate](GroupStateTimeout.ProcessingTimeTimeout) { case (storeId: String, events: Iterator[orderInfoStoreUpdate], state: GroupState[g1InfoStore]) => // 如果时间超时,更新缓存 if (state.hasTimedOut) { //时间过了,删除sotre里的数据,把expired=true,返回在线时间等信息 0:00-0:10可以设这个时间,不计算,干掉数据 val finalUpdate = g1InfoStoreUpdate(storeId, state.get.num, state.get.money, state.get.timestamp, expired = true) state.remove() finalUpdate } else { var storeNum = events.map(_.orderId).size var storeMoney = events.map(_.money).reduce(_ + _) //其实只会一个 val updatedStore = if (state.exists) { //门店存在, val old_order_moneys = events.map(_.oldMoney).reduce(_ + _) //其实只会一个 //门店总客+=新订单金额-旧订单 storeMoney = state.get.money + storeMoney - old_order_moneys g1InfoStore(storeId, state.get.num + events.map(_.orderId).size, storeMoney, state.get.timestamp) } else { g1InfoStore(storeId, storeNum, storeMoney, state.get.timestamp) } //更新缓存里面的这条数据信息 state.update(updatedStore) // Set timeout such that the session will be expired if no data received for 10 seconds state.setTimeoutDuration("3600 seconds") g1InfoStoreUpdate(storeId, state.get.num, state.get.money, state.get.timestamp, expired = true) } } //门店统计好的数据在汇总 // storeUpdate.createOrReplaceTempView("update_tmp") // spark.sql("select storeId,otype, count(1) num,sum(money) as moneys ,sum(oldMoney) as oldMoney from update_tmp group by storeId,otype ") val query = storeUpdate .writeStream .outputMode("update") .format("console") .start() query.awaitTermination() } } /** User-defined data type representing the input events */ case class NOEvent(orderId: String, otype: Int, storeId: String, money: Double, orderDate: String, timestamp: Timestamp) case class orderInfoStore(orderId: String, otype: Int, storeId: String, money: Double, oldMoney: Double, orderDate: String, timestamp: Timestamp) case class orderInfoStoreUpdate(orderId: String, otype: Int, storeId: String, money: Double, oldMoney: Double, orderDate: String, timestamp: Timestamp, expired: Boolean) /** 第一个分组信息 */ case class g1InfoStore(storeid: String,/* otype: Int,*/ num: Int, money: Double, timestamp: Timestamp) case class g1InfoStoreUpdate(storeid: String,/* otype: Int,*/ num: Int, money: Double, timestamp: Timestamp, expired: Boolean) {code} I changed the way to maintain a list of orders in the store, and then to maintain the list and re-operate the calculation. However, in this way, I will maintain the details of the first-line data, and I will temporarily use many resources. I wonder if there is any other way to deal with this kind of data. {code:java} package org.roy.demo.streaming.bus import java.sql.Timestamp import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions.current_timestamp import org.apache.spark.sql.streaming._ import streaming.StreamingExamples /** * create by Roy 2019/09/06 * Counting day order number and amount */ object StructuredStoreOrderState { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) StreamingExamples.setStreamingLogLevels() def main(args: Array[String]): Unit = { val spark = SparkSession .builder.master("local[*]") .appName("StructuredSessionization") .getOrCreate() import spark.implicits._ // Create DataFrame representing the stream of input lines from connection to host:port val lines = spark.readStream .format("socket") .option("host", "10.200.102.192") .option("port", 9998) .load().withColumn("current_timestamp", current_timestamp) //100,1,10,20,2019-09-03 //1001,1,10,200,2019-09-03 //1001,1,10,2000,2019-09-03 val events = lines .as[(String, Timestamp)] .map { case (line, timestamp) => { val orderInfo = line.split(",") if (orderInfo != null && orderInfo.size > 4) { val objEvent = dataEvent(orderInfo(0), orderInfo(1).toInt, orderInfo(2), orderInfo(3).toDouble, orderInfo(4), timestamp) objEvent } else { null } } }.filter(obj => obj != null) val orderUpdates = events .groupByKey(event => event.storeId) .mapGroupsWithState[storeOrderInfoState, storeOrderInfoStateUpdate](GroupStateTimeout.ProcessingTimeTimeout) { case (key: String, values: Iterator[dataEvent], state: GroupState[storeOrderInfoState]) => val seqs = values.toSeq val times = seqs.map(_.timestamp).seq val max_time = new Timestamp(System.currentTimeMillis()) if (state.hasTimedOut) { val finalUpdate = storeOrderInfoStateUpdate(key, state.get.orderNum, state.get.orderMoney, max_time, expired = true) state.remove() finalUpdate } else { val updatedSession = if (state.exists) { val stateMap = state.get.orderInfoStoreMap var norderMap2: Map[String, Double] = Map() var num = 0 var money = 0.0 seqs.foreach(e => { if (stateMap.contains(e.orderId)) { //新订单-旧订单,再进行+总合 money += e.money - stateMap.get(e.orderId).get } else { num += 1 money += e.money } norderMap2 += (e.orderId -> e.money) }) //取出所有的订单+流进来的订单,需要判断是否有重复订单 storeOrderInfoState(key, state.get.orderNum + num, state.get.orderMoney + money, stateMap ++ norderMap2, max_time) } else { var norderMap2: Map[String, Double] = Map() var money = 0.0 seqs.foreach(e => { money += e.money norderMap2 += (e.orderId -> e.money) }) storeOrderInfoState(key, norderMap2.size, money, norderMap2, max_time) } //更新缓存里面的这条数据信息 println("updatedSession" + updatedSession) println(updatedSession.orderMoney) state.update(updatedSession) // Set timeout such that the session will be expired if no data received for 10 seconds state.setTimeoutDuration("3600 seconds") storeOrderInfoStateUpdate(key, state.get.orderNum, state.get.orderMoney, max_time, expired = false) } } val query = orderUpdates .writeStream .outputMode("update") .format("console") .start() query.awaitTermination() } } /** User-defined data type representing the input events */ case class dataEvent(orderId: String, otype: Int, storeId: String, money: Double, orderTime: String, timestamp: Timestamp) //最一个订单状态 case class orderEnventInfo(orderId: String, otype: Int, storeId: String, money: Double, orderTime: String, timestamp: Timestamp) //门店里面,维护一张所有订单,sotre orderInfoStoreMap: Map[String, orderEnventInfo] Seq[orderEnventInfo] case class storeOrderInfoState(storeId: String, orderNum: Int, orderMoney: Double, orderInfoStoreMap: Map[String, Double], timestamp: Timestamp) //返回计算后的结果 case class storeOrderInfoStateUpdate(storeId: String, orderNum: Int, orderMoney: Double, timestamp: Timestamp, expired: Boolean) {code} The final output I need is this !微信图片_20190906170459.png! was: Exception in thread "main" org.apache.spark.sql.AnalysisException: Multiple mapGroupsWithStates are not supported on a streaming DataFrames/Datasets;; How do you carry out a double digit operation?The first processing state and the second summarizing the processed state data?Is there any good way not to sink->kafka, kafka source->spark structured like this?thank you {code:java} //代码占位符 package org.roy.demo.streaming.bus import java.sql.Timestamp import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions.current_timestamp import org.apache.spark.sql.streaming._ import streaming.StreamingExamples object StructuredOrderStateListRturn { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) StreamingExamples.setStreamingLogLevels() def main(args: Array[String]): Unit = { val spark = SparkSession .builder.master("local[*]") .appName("StructuredSessionization") .getOrCreate() import spark.implicits._ // Create DataFrame representing the stream of input lines from connection to host:port val lines = spark.readStream .format("socket") .option("host", "10.200.102.192") .option("port", 9998) .load().withColumn("current_timestamp", current_timestamp) //100,1,10,20,2019-09-03 //1001,1,10,200,2019-09-03 //1001,1,10,2000,2019-09-03 val events = lines .as[(String, Timestamp)] .map { case (line, timestamp) => { val orderInfo = line.split(",") if (orderInfo != null && orderInfo.size > 4) { val objEvent = NOEvent(orderInfo(0), orderInfo(1).toInt, orderInfo(2), orderInfo(3).toDouble, orderInfo(4), timestamp) objEvent } else { null } } }.filter(obj => obj != null) /** * -次维护订单状态数据,返回最新的一个订单,并把上一个订单金额附加返回 */ val orderUpdates = events .groupByKey(event => event.orderId) //orderInfoStore=输入的状态类型,orderInfoStoreUpdate=输出的状态类型 .mapGroupsWithState[orderInfoStore, orderInfoStoreUpdate](GroupStateTimeout.ProcessingTimeTimeout) { case (orderId: String, events: Iterator[NOEvent], state: GroupState[orderInfoStore]) => // 如果时间超时,更新缓存 if (state.hasTimedOut) { //时间过了,删除sotre里的数据,把expired=true,返回在线时间等信息 0:00-0:10可以设这个时间,不计算,干掉数据 val finalUpdate = orderInfoStoreUpdate(orderId, state.get.otype, state.get.storeId, state.get.money, 0.0, state.get.orderDate, state.get.timestamp, expired = true) state.remove() finalUpdate } else { //订单没有超时,如果id存在,则替换掉,使用新的订单数据,或作别的操作 var oldOrder = 0.0 //上一笔的金额 val lastEnvent = events.toSeq.last val updatedSession = if (state.exists) { oldOrder = state.get.money //存在,算出旧的金额是多少 orderInfoStore(orderId, lastEnvent.otype, lastEnvent.storeId, lastEnvent.money, oldOrder, lastEnvent.orderDate, lastEnvent.timestamp) } else { orderInfoStore(orderId, lastEnvent.otype, lastEnvent.storeId, lastEnvent.money, 0, lastEnvent.orderDate, lastEnvent.timestamp) } //更新缓存里面的这条数据信息 state.update(updatedSession) // Set timeout such that the session will be expired if no data received for 10 seconds state.setTimeoutDuration("3600 seconds") orderInfoStoreUpdate(orderId, state.get.otype, state.get.storeId, state.get.money, oldOrder, state.get.orderDate, state.get.timestamp, expired = false) } } /** * 二次计算出门店的数据 */ val storeUpdate = orderUpdates.groupByKey(order => order.storeId).mapGroupsWithState[g1InfoStore, g1InfoStoreUpdate](GroupStateTimeout.ProcessingTimeTimeout) { case (storeId: String, events: Iterator[orderInfoStoreUpdate], state: GroupState[g1InfoStore]) => // 如果时间超时,更新缓存 if (state.hasTimedOut) { //时间过了,删除sotre里的数据,把expired=true,返回在线时间等信息 0:00-0:10可以设这个时间,不计算,干掉数据 val finalUpdate = g1InfoStoreUpdate(storeId, state.get.num, state.get.money, state.get.timestamp, expired = true) state.remove() finalUpdate } else { var storeNum = events.map(_.orderId).size var storeMoney = events.map(_.money).reduce(_ + _) //其实只会一个 val updatedStore = if (state.exists) { //门店存在, val old_order_moneys = events.map(_.oldMoney).reduce(_ + _) //其实只会一个 //门店总客+=新订单金额-旧订单 storeMoney = state.get.money + storeMoney - old_order_moneys g1InfoStore(storeId, state.get.num + events.map(_.orderId).size, storeMoney, state.get.timestamp) } else { g1InfoStore(storeId, storeNum, storeMoney, state.get.timestamp) } //更新缓存里面的这条数据信息 state.update(updatedStore) // Set timeout such that the session will be expired if no data received for 10 seconds state.setTimeoutDuration("3600 seconds") g1InfoStoreUpdate(storeId, state.get.num, state.get.money, state.get.timestamp, expired = true) } } //门店统计好的数据在汇总 // storeUpdate.createOrReplaceTempView("update_tmp") // spark.sql("select storeId,otype, count(1) num,sum(money) as moneys ,sum(oldMoney) as oldMoney from update_tmp group by storeId,otype ") val query = storeUpdate .writeStream .outputMode("update") .format("console") .start() query.awaitTermination() } } /** User-defined data type representing the input events */ case class NOEvent(orderId: String, otype: Int, storeId: String, money: Double, orderDate: String, timestamp: Timestamp) case class orderInfoStore(orderId: String, otype: Int, storeId: String, money: Double, oldMoney: Double, orderDate: String, timestamp: Timestamp) case class orderInfoStoreUpdate(orderId: String, otype: Int, storeId: String, money: Double, oldMoney: Double, orderDate: String, timestamp: Timestamp, expired: Boolean) /** 第一个分组信息 */ case class g1InfoStore(storeid: String,/* otype: Int,*/ num: Int, money: Double, timestamp: Timestamp) case class g1InfoStoreUpdate(storeid: String,/* otype: Int,*/ num: Int, money: Double, timestamp: Timestamp, expired: Boolean) {code} I changed the way to maintain a list of orders in the store, and then to maintain the list and re-operate the calculation. However, in this way, I will maintain the details of the first-line data, and I will temporarily use many resources. I wonder if there is any other way to deal with this kind of data. {code:java} package org.roy.demo.streaming.bus import java.sql.Timestamp import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions.current_timestamp import org.apache.spark.sql.streaming._ import streaming.StreamingExamples /** * create by Roy 2019/09/06 * Counting day order number and amount */ object StructuredStoreOrderState { Logger.getLogger("org.apache.spark").setLevel(Level.WARN) StreamingExamples.setStreamingLogLevels() def main(args: Array[String]): Unit = { val spark = SparkSession .builder.master("local[*]") .appName("StructuredSessionization") .getOrCreate() import spark.implicits._ // Create DataFrame representing the stream of input lines from connection to host:port val lines = spark.readStream .format("socket") .option("host", "10.200.102.192") .option("port", 9998) .load().withColumn("current_timestamp", current_timestamp) //100,1,10,20,2019-09-03 //1001,1,10,200,2019-09-03 //1001,1,10,2000,2019-09-03 val events = lines .as[(String, Timestamp)] .map { case (line, timestamp) => { val orderInfo = line.split(",") if (orderInfo != null && orderInfo.size > 4) { val objEvent = dataEvent(orderInfo(0), orderInfo(1).toInt, orderInfo(2), orderInfo(3).toDouble, orderInfo(4), timestamp) objEvent } else { null } } }.filter(obj => obj != null) val orderUpdates = events .groupByKey(event => event.storeId) .mapGroupsWithState[storeOrderInfoState, storeOrderInfoStateUpdate](GroupStateTimeout.ProcessingTimeTimeout) { case (key: String, values: Iterator[dataEvent], state: GroupState[storeOrderInfoState]) => val seqs = values.toSeq val times = seqs.map(_.timestamp).seq val max_time = new Timestamp(System.currentTimeMillis()) if (state.hasTimedOut) { val finalUpdate = storeOrderInfoStateUpdate(key, state.get.orderNum, state.get.orderMoney, max_time, expired = true) state.remove() finalUpdate } else { val updatedSession = if (state.exists) { val stateMap = state.get.orderInfoStoreMap var norderMap2: Map[String, Double] = Map() var num = 0 var money = 0.0 seqs.foreach(e => { if (stateMap.contains(e.orderId)) { //新订单-旧订单,再进行+总合 money += e.money - stateMap.get(e.orderId).get } else { num += 1 money += e.money } norderMap2 += (e.orderId -> e.money) }) //取出所有的订单+流进来的订单,需要判断是否有重复订单 storeOrderInfoState(key, state.get.orderNum + num, state.get.orderMoney + money, stateMap ++ norderMap2, max_time) } else { var norderMap2: Map[String, Double] = Map() var money = 0.0 seqs.foreach(e => { money += e.money norderMap2 += (e.orderId -> e.money) }) storeOrderInfoState(key, norderMap2.size, money, norderMap2, max_time) } //更新缓存里面的这条数据信息 println("updatedSession" + updatedSession) println(updatedSession.orderMoney) state.update(updatedSession) // Set timeout such that the session will be expired if no data received for 10 seconds state.setTimeoutDuration("3600 seconds") storeOrderInfoStateUpdate(key, state.get.orderNum, state.get.orderMoney, max_time, expired = false) } } val query = orderUpdates .writeStream .outputMode("update") .format("console") .start() query.awaitTermination() } } /** User-defined data type representing the input events */ case class dataEvent(orderId: String, otype: Int, storeId: String, money: Double, orderTime: String, timestamp: Timestamp) //最一个订单状态 case class orderEnventInfo(orderId: String, otype: Int, storeId: String, money: Double, orderTime: String, timestamp: Timestamp) //门店里面,维护一张所有订单,sotre orderInfoStoreMap: Map[String, orderEnventInfo] Seq[orderEnventInfo] case class storeOrderInfoState(storeId: String, orderNum: Int, orderMoney: Double, orderInfoStoreMap: Map[String, Double], timestamp: Timestamp) //返回计算后的结果 case class storeOrderInfoStateUpdate(storeId: String, orderNum: Int, orderMoney: Double, timestamp: Timestamp, expired: Boolean) {code} The final output I need is this > Exception in thread "main" org.apache.spark.sql.AnalysisException: Multiple > mapGroupsWithStates are not supported on a streaming DataFrames/Datasets;; > ------------------------------------------------------------------------------------------------------------------------------------------------------ > > Key: SPARK-28999 > URL: https://issues.apache.org/jira/browse/SPARK-28999 > Project: Spark > Issue Type: Question > Components: Spark Core > Affects Versions: 2.3.0 > Reporter: ruiliang > Priority: Major > Attachments: 微信图片_20190906170459.png > > > Exception in thread "main" org.apache.spark.sql.AnalysisException: Multiple > mapGroupsWithStates are not supported on a streaming DataFrames/Datasets;; > > How do you carry out a double digit operation?The first processing state and > the second summarizing the processed state data?Is there any good way not to > sink->kafka, kafka source->spark structured like this?thank you > {code:java} > //代码占位符 > package org.roy.demo.streaming.bus > import java.sql.Timestamp > import org.apache.log4j.{Level, Logger} > import org.apache.spark.sql.SparkSession > import org.apache.spark.sql.functions.current_timestamp > import org.apache.spark.sql.streaming._ > import streaming.StreamingExamples > object StructuredOrderStateListRturn { > Logger.getLogger("org.apache.spark").setLevel(Level.WARN) > StreamingExamples.setStreamingLogLevels() > def main(args: Array[String]): Unit = { > val spark = SparkSession > .builder.master("local[*]") > .appName("StructuredSessionization") > .getOrCreate() > import spark.implicits._ > // Create DataFrame representing the stream of input lines from > connection to host:port > val lines = spark.readStream > .format("socket") > .option("host", "10.200.102.192") > .option("port", 9998) > .load().withColumn("current_timestamp", current_timestamp) > //100,1,10,20,2019-09-03 > //1001,1,10,200,2019-09-03 > //1001,1,10,2000,2019-09-03 > val events = lines > .as[(String, Timestamp)] > .map { case (line, timestamp) => { > val orderInfo = line.split(",") > if (orderInfo != null && orderInfo.size > 4) { > val objEvent = NOEvent(orderInfo(0), orderInfo(1).toInt, > orderInfo(2), orderInfo(3).toDouble, orderInfo(4), timestamp) > objEvent > } else { > null > } > } > }.filter(obj => obj != null) > /** > * -次维护订单状态数据,返回最新的一个订单,并把上一个订单金额附加返回 > */ > val orderUpdates = events > .groupByKey(event => event.orderId) > //orderInfoStore=输入的状态类型,orderInfoStoreUpdate=输出的状态类型 > .mapGroupsWithState[orderInfoStore, > orderInfoStoreUpdate](GroupStateTimeout.ProcessingTimeTimeout) { > case (orderId: String, events: Iterator[NOEvent], state: > GroupState[orderInfoStore]) => > // 如果时间超时,更新缓存 > if (state.hasTimedOut) { > //时间过了,删除sotre里的数据,把expired=true,返回在线时间等信息 0:00-0:10可以设这个时间,不计算,干掉数据 > val finalUpdate = > orderInfoStoreUpdate(orderId, state.get.otype, state.get.storeId, > state.get.money, 0.0, state.get.orderDate, state.get.timestamp, expired = > true) > state.remove() > finalUpdate > } else { > //订单没有超时,如果id存在,则替换掉,使用新的订单数据,或作别的操作 > var oldOrder = 0.0 //上一笔的金额 > val lastEnvent = events.toSeq.last > val updatedSession = if (state.exists) { > oldOrder = state.get.money > //存在,算出旧的金额是多少 > orderInfoStore(orderId, lastEnvent.otype, lastEnvent.storeId, > lastEnvent.money, oldOrder, lastEnvent.orderDate, lastEnvent.timestamp) > } else { > orderInfoStore(orderId, lastEnvent.otype, lastEnvent.storeId, > lastEnvent.money, 0, lastEnvent.orderDate, lastEnvent.timestamp) > } > //更新缓存里面的这条数据信息 > state.update(updatedSession) > // Set timeout such that the session will be expired if no data > received for 10 seconds > state.setTimeoutDuration("3600 seconds") > orderInfoStoreUpdate(orderId, state.get.otype, state.get.storeId, > state.get.money, oldOrder, state.get.orderDate, state.get.timestamp, expired > = false) > } > } > /** > * 二次计算出门店的数据 > */ > val storeUpdate = orderUpdates.groupByKey(order => > order.storeId).mapGroupsWithState[g1InfoStore, > g1InfoStoreUpdate](GroupStateTimeout.ProcessingTimeTimeout) { > case (storeId: String, events: Iterator[orderInfoStoreUpdate], state: > GroupState[g1InfoStore]) => > // 如果时间超时,更新缓存 > if (state.hasTimedOut) { > //时间过了,删除sotre里的数据,把expired=true,返回在线时间等信息 0:00-0:10可以设这个时间,不计算,干掉数据 > val finalUpdate = > g1InfoStoreUpdate(storeId, state.get.num, state.get.money, > state.get.timestamp, expired = true) > state.remove() > finalUpdate > } else { > var storeNum = events.map(_.orderId).size > var storeMoney = events.map(_.money).reduce(_ + _) //其实只会一个 > val updatedStore = if (state.exists) { //门店存在, > val old_order_moneys = events.map(_.oldMoney).reduce(_ + _) > //其实只会一个 > //门店总客+=新订单金额-旧订单 > storeMoney = state.get.money + storeMoney - old_order_moneys > g1InfoStore(storeId, state.get.num + events.map(_.orderId).size, > storeMoney, state.get.timestamp) > } else { > g1InfoStore(storeId, storeNum, storeMoney, state.get.timestamp) > } > //更新缓存里面的这条数据信息 > state.update(updatedStore) > // Set timeout such that the session will be expired if no data > received for 10 seconds > state.setTimeoutDuration("3600 seconds") > g1InfoStoreUpdate(storeId, state.get.num, state.get.money, > state.get.timestamp, expired = true) > } > } > //门店统计好的数据在汇总 > // storeUpdate.createOrReplaceTempView("update_tmp") > // spark.sql("select storeId,otype, count(1) num,sum(money) as moneys > ,sum(oldMoney) as oldMoney from update_tmp group by storeId,otype ") > val query = storeUpdate > .writeStream > .outputMode("update") > .format("console") > .start() > query.awaitTermination() > } > } > /** User-defined data type representing the input events */ > case class NOEvent(orderId: String, otype: Int, storeId: String, money: > Double, orderDate: String, timestamp: Timestamp) > case class orderInfoStore(orderId: String, otype: Int, storeId: String, > money: Double, oldMoney: Double, orderDate: String, timestamp: Timestamp) > case class orderInfoStoreUpdate(orderId: String, otype: Int, storeId: String, > money: Double, oldMoney: Double, orderDate: String, timestamp: Timestamp, > expired: Boolean) > /** 第一个分组信息 */ > case class g1InfoStore(storeid: String,/* otype: Int,*/ num: Int, money: > Double, timestamp: Timestamp) > case class g1InfoStoreUpdate(storeid: String,/* otype: Int,*/ num: Int, > money: Double, timestamp: Timestamp, expired: Boolean) > {code} > > > > I changed the way to maintain a list of orders in the store, and then to > maintain the list and re-operate the calculation. However, in this way, I > will maintain the details of the first-line data, and I will temporarily use > many resources. I wonder if there is any other way to deal with this kind of > data. > {code:java} > package org.roy.demo.streaming.bus > import java.sql.Timestamp > import org.apache.log4j.{Level, Logger} > import org.apache.spark.sql.SparkSession > import org.apache.spark.sql.functions.current_timestamp > import org.apache.spark.sql.streaming._ > import streaming.StreamingExamples > /** > * create by Roy 2019/09/06 > * Counting day order number and amount > */ > object StructuredStoreOrderState { > Logger.getLogger("org.apache.spark").setLevel(Level.WARN) > StreamingExamples.setStreamingLogLevels() > def main(args: Array[String]): Unit = { > val spark = SparkSession > .builder.master("local[*]") > .appName("StructuredSessionization") > .getOrCreate() > import spark.implicits._ > // Create DataFrame representing the stream of input lines from > connection to host:port > val lines = spark.readStream > .format("socket") > .option("host", "10.200.102.192") > .option("port", 9998) > .load().withColumn("current_timestamp", current_timestamp) > //100,1,10,20,2019-09-03 > //1001,1,10,200,2019-09-03 > //1001,1,10,2000,2019-09-03 > val events = lines > .as[(String, Timestamp)] > .map { case (line, timestamp) => { > val orderInfo = line.split(",") > if (orderInfo != null && orderInfo.size > 4) { > val objEvent = dataEvent(orderInfo(0), orderInfo(1).toInt, > orderInfo(2), orderInfo(3).toDouble, orderInfo(4), timestamp) > objEvent > } else { > null > } > } > }.filter(obj => obj != null) > val orderUpdates = events > .groupByKey(event => event.storeId) > .mapGroupsWithState[storeOrderInfoState, > storeOrderInfoStateUpdate](GroupStateTimeout.ProcessingTimeTimeout) { > case (key: String, values: Iterator[dataEvent], state: > GroupState[storeOrderInfoState]) => > val seqs = values.toSeq > val times = seqs.map(_.timestamp).seq > val max_time = new Timestamp(System.currentTimeMillis()) > if (state.hasTimedOut) { > val finalUpdate = > storeOrderInfoStateUpdate(key, state.get.orderNum, > state.get.orderMoney, max_time, expired = true) > state.remove() > finalUpdate > } else { > val updatedSession = if (state.exists) { > val stateMap = state.get.orderInfoStoreMap > var norderMap2: Map[String, Double] = Map() > var num = 0 > var money = 0.0 > seqs.foreach(e => { > if (stateMap.contains(e.orderId)) { > //新订单-旧订单,再进行+总合 > money += e.money - stateMap.get(e.orderId).get > } else { > num += 1 > money += e.money > } > norderMap2 += (e.orderId -> e.money) > }) > //取出所有的订单+流进来的订单,需要判断是否有重复订单 > storeOrderInfoState(key, state.get.orderNum + num, > state.get.orderMoney + money, stateMap ++ norderMap2, max_time) > } else { > var norderMap2: Map[String, Double] = Map() > var money = 0.0 > seqs.foreach(e => { > money += e.money > norderMap2 += (e.orderId -> e.money) > }) > storeOrderInfoState(key, norderMap2.size, money, norderMap2, > max_time) > } > //更新缓存里面的这条数据信息 > println("updatedSession" + updatedSession) > println(updatedSession.orderMoney) > state.update(updatedSession) > // Set timeout such that the session will be expired if no data > received for 10 seconds > state.setTimeoutDuration("3600 seconds") > storeOrderInfoStateUpdate(key, state.get.orderNum, > state.get.orderMoney, max_time, expired = false) > } > } > val query = orderUpdates > .writeStream > .outputMode("update") > .format("console") > .start() > query.awaitTermination() > } > } > /** User-defined data type representing the input events */ > case class dataEvent(orderId: String, otype: Int, storeId: String, money: > Double, orderTime: String, timestamp: Timestamp) > //最一个订单状态 > case class orderEnventInfo(orderId: String, otype: Int, storeId: String, > money: Double, orderTime: String, timestamp: Timestamp) > //门店里面,维护一张所有订单,sotre orderInfoStoreMap: Map[String, orderEnventInfo] > Seq[orderEnventInfo] > case class storeOrderInfoState(storeId: String, orderNum: Int, orderMoney: > Double, orderInfoStoreMap: Map[String, Double], timestamp: Timestamp) > //返回计算后的结果 > case class storeOrderInfoStateUpdate(storeId: String, orderNum: Int, > orderMoney: Double, timestamp: Timestamp, > expired: Boolean) > {code} > The final output I need is this > !微信图片_20190906170459.png! -- This message was sent by Atlassian Jira (v8.3.2#803003) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org