[ 
https://issues.apache.org/jira/browse/SPARK-28999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ruiliang updated SPARK-28999:
-----------------------------
    Description: 
Exception in thread "main" org.apache.spark.sql.AnalysisException: Multiple 
mapGroupsWithStates are not supported on a streaming DataFrames/Datasets;;

 

How do you carry out a double digit operation?The first processing state and 
the second summarizing the processed state data?Is there any good way not to 
sink->kafka, kafka source->spark structured like this?thank you
{code:java}
//代码占位符
package org.roy.demo.streaming.bus

import java.sql.Timestamp
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.current_timestamp
import org.apache.spark.sql.streaming._
import streaming.StreamingExamples

object StructuredOrderStateListRturn {
  Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
  StreamingExamples.setStreamingLogLevels()

  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder.master("local[*]")
      .appName("StructuredSessionization")
      .getOrCreate()
    import spark.implicits._
    // Create DataFrame representing the stream of input lines from connection 
to host:port
    val lines = spark.readStream
      .format("socket")
      .option("host", "10.200.102.192")
      .option("port", 9998)
      .load().withColumn("current_timestamp", current_timestamp)
    //100,1,10,20,2019-09-03
    //1001,1,10,200,2019-09-03
    //1001,1,10,2000,2019-09-03
    val events = lines
      .as[(String, Timestamp)]
      .map { case (line, timestamp) => {
        val orderInfo = line.split(",")
        if (orderInfo != null && orderInfo.size > 4) {
          val objEvent = NOEvent(orderInfo(0), orderInfo(1).toInt, 
orderInfo(2), orderInfo(3).toDouble, orderInfo(4), timestamp)
          objEvent
        } else {
          null
        }
      }
      }.filter(obj => obj != null)
    /**
      * -次维护订单状态数据,返回最新的一个订单,并把上一个订单金额附加返回
      */
    val orderUpdates = events
      .groupByKey(event => event.orderId)
      //orderInfoStore=输入的状态类型,orderInfoStoreUpdate=输出的状态类型
      .mapGroupsWithState[orderInfoStore, 
orderInfoStoreUpdate](GroupStateTimeout.ProcessingTimeTimeout) {
      case (orderId: String, events: Iterator[NOEvent], state: 
GroupState[orderInfoStore]) =>
        // 如果时间超时,更新缓存
        if (state.hasTimedOut) {
          //时间过了,删除sotre里的数据,把expired=true,返回在线时间等信息 0:00-0:10可以设这个时间,不计算,干掉数据
          val finalUpdate =
            orderInfoStoreUpdate(orderId, state.get.otype, state.get.storeId, 
state.get.money, 0.0, state.get.orderDate, state.get.timestamp, expired = true)
          state.remove()
          finalUpdate
        } else {
          //订单没有超时,如果id存在,则替换掉,使用新的订单数据,或作别的操作
          var oldOrder = 0.0 //上一笔的金额
          val lastEnvent = events.toSeq.last
          val updatedSession = if (state.exists) {
            oldOrder = state.get.money
            //存在,算出旧的金额是多少
            orderInfoStore(orderId, lastEnvent.otype, lastEnvent.storeId, 
lastEnvent.money, oldOrder, lastEnvent.orderDate, lastEnvent.timestamp)
          } else {
            orderInfoStore(orderId, lastEnvent.otype, lastEnvent.storeId, 
lastEnvent.money, 0, lastEnvent.orderDate, lastEnvent.timestamp)
          }
          //更新缓存里面的这条数据信息
          state.update(updatedSession)
          // Set timeout such that the session will be expired if no data 
received for 10 seconds
          state.setTimeoutDuration("3600 seconds")
          orderInfoStoreUpdate(orderId, state.get.otype, state.get.storeId, 
state.get.money, oldOrder, state.get.orderDate, state.get.timestamp, expired = 
false)
        }
    }

    /**
      * 二次计算出门店的数据
      */
    val storeUpdate = orderUpdates.groupByKey(order => 
order.storeId).mapGroupsWithState[g1InfoStore, 
g1InfoStoreUpdate](GroupStateTimeout.ProcessingTimeTimeout) {
      case (storeId: String, events: Iterator[orderInfoStoreUpdate], state: 
GroupState[g1InfoStore]) =>
        // 如果时间超时,更新缓存
        if (state.hasTimedOut) {
          //时间过了,删除sotre里的数据,把expired=true,返回在线时间等信息 0:00-0:10可以设这个时间,不计算,干掉数据
          val finalUpdate =
            g1InfoStoreUpdate(storeId, state.get.num, state.get.money, 
state.get.timestamp, expired = true)
          state.remove()
          finalUpdate
        } else {
          var storeNum = events.map(_.orderId).size
          var storeMoney = events.map(_.money).reduce(_ + _) //其实只会一个
          val updatedStore = if (state.exists) { //门店存在,
            val old_order_moneys = events.map(_.oldMoney).reduce(_ + _) //其实只会一个
            //门店总客+=新订单金额-旧订单
            storeMoney = state.get.money + storeMoney - old_order_moneys
            g1InfoStore(storeId, state.get.num + events.map(_.orderId).size, 
storeMoney, state.get.timestamp)
          } else {
            g1InfoStore(storeId, storeNum, storeMoney, state.get.timestamp)
          }
          //更新缓存里面的这条数据信息
          state.update(updatedStore)
          // Set timeout such that the session will be expired if no data 
received for 10 seconds
          state.setTimeoutDuration("3600 seconds")
          g1InfoStoreUpdate(storeId, state.get.num, state.get.money, 
state.get.timestamp, expired = true)
        }
    }
    //门店统计好的数据在汇总
//    storeUpdate.createOrReplaceTempView("update_tmp")
//    spark.sql("select storeId,otype, count(1) num,sum(money) as moneys 
,sum(oldMoney) as oldMoney  from update_tmp group by storeId,otype ")
    val query = storeUpdate
      .writeStream
      .outputMode("update")
      .format("console")
      .start()
    query.awaitTermination()
  }
}

/** User-defined data type representing the input events */
case class NOEvent(orderId: String, otype: Int, storeId: String, money: Double, 
orderDate: String, timestamp: Timestamp)

case class orderInfoStore(orderId: String, otype: Int, storeId: String, money: 
Double, oldMoney: Double, orderDate: String, timestamp: Timestamp)

case class orderInfoStoreUpdate(orderId: String, otype: Int, storeId: String, 
money: Double, oldMoney: Double, orderDate: String, timestamp: Timestamp,
                                expired: Boolean)

/** 第一个分组信息 */
case class g1InfoStore(storeid: String,/* otype: Int,*/ num: Int, money: 
Double, timestamp: Timestamp)

case class g1InfoStoreUpdate(storeid: String,/* otype: Int,*/ num: Int, money: 
Double, timestamp: Timestamp, expired: Boolean)


{code}
 

 

 

I changed the way to maintain a list of orders in the store, and then to 
maintain the list and re-operate the calculation. However, in this way, I will 
maintain the details of the first-line data, and I will temporarily use many 
resources. I wonder if there is any other way to deal with this kind of data.
{code:java}
package org.roy.demo.streaming.bus

import java.sql.Timestamp
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.current_timestamp
import org.apache.spark.sql.streaming._
import streaming.StreamingExamples

/**
  * create by Roy 2019/09/06
  * Counting day order number and amount
  */
object StructuredStoreOrderState {
  Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
  StreamingExamples.setStreamingLogLevels()

  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder.master("local[*]")
      .appName("StructuredSessionization")
      .getOrCreate()
    import spark.implicits._
    // Create DataFrame representing the stream of input lines from connection 
to host:port
    val lines = spark.readStream
      .format("socket")
      .option("host", "10.200.102.192")
      .option("port", 9998)
      .load().withColumn("current_timestamp", current_timestamp)
    //100,1,10,20,2019-09-03
    //1001,1,10,200,2019-09-03
    //1001,1,10,2000,2019-09-03
    val events = lines
      .as[(String, Timestamp)]
      .map { case (line, timestamp) => {
        val orderInfo = line.split(",")
        if (orderInfo != null && orderInfo.size > 4) {
          val objEvent = dataEvent(orderInfo(0), orderInfo(1).toInt, 
orderInfo(2), orderInfo(3).toDouble, orderInfo(4), timestamp)
          objEvent
        } else {
          null
        }
      }
      }.filter(obj => obj != null)
    val orderUpdates = events
      .groupByKey(event => event.storeId)
      .mapGroupsWithState[storeOrderInfoState, 
storeOrderInfoStateUpdate](GroupStateTimeout.ProcessingTimeTimeout) {
      case (key: String, values: Iterator[dataEvent], state: 
GroupState[storeOrderInfoState]) =>
        val seqs = values.toSeq
        val times = seqs.map(_.timestamp).seq
        val max_time = new Timestamp(System.currentTimeMillis())
        if (state.hasTimedOut) {
          val finalUpdate =
            storeOrderInfoStateUpdate(key, state.get.orderNum, 
state.get.orderMoney, max_time, expired = true)
          state.remove()
          finalUpdate
        } else {
          val updatedSession = if (state.exists) {
            val stateMap = state.get.orderInfoStoreMap
            var norderMap2: Map[String, Double] = Map()
            var num = 0
            var money = 0.0
            seqs.foreach(e => {
              if (stateMap.contains(e.orderId)) {
                //新订单-旧订单,再进行+总合
                money += e.money - stateMap.get(e.orderId).get
              } else {
                num += 1
                money += e.money
              }
              norderMap2 += (e.orderId -> e.money)
            })
            //取出所有的订单+流进来的订单,需要判断是否有重复订单
            storeOrderInfoState(key, state.get.orderNum + num, 
state.get.orderMoney + money, stateMap ++ norderMap2, max_time)
          } else {
            var norderMap2: Map[String, Double] = Map()
            var money = 0.0
            seqs.foreach(e => {
              money += e.money
              norderMap2 += (e.orderId -> e.money)
            })
            storeOrderInfoState(key, norderMap2.size, money, norderMap2, 
max_time)
          }
          //更新缓存里面的这条数据信息
          println("updatedSession" + updatedSession)
          println(updatedSession.orderMoney)
          state.update(updatedSession)
          // Set timeout such that the session will be expired if no data 
received for 10 seconds
          state.setTimeoutDuration("3600 seconds")
          storeOrderInfoStateUpdate(key, state.get.orderNum, 
state.get.orderMoney, max_time, expired = false)
        }
    }


    val query = orderUpdates
      .writeStream
      .outputMode("update")
      .format("console")
      .start()
    query.awaitTermination()
  }
}

/** User-defined data type representing the input events */
case class dataEvent(orderId: String, otype: Int, storeId: String, money: 
Double, orderTime: String, timestamp: Timestamp)

//最一个订单状态
case class orderEnventInfo(orderId: String, otype: Int, storeId: String, money: 
Double, orderTime: String, timestamp: Timestamp)

//门店里面,维护一张所有订单,sotre  orderInfoStoreMap: Map[String, orderEnventInfo]  
Seq[orderEnventInfo]
case class storeOrderInfoState(storeId: String, orderNum: Int, orderMoney: 
Double, orderInfoStoreMap: Map[String, Double], timestamp: Timestamp)

//返回计算后的结果
case class storeOrderInfoStateUpdate(storeId: String, orderNum: Int, 
orderMoney: Double, timestamp: Timestamp,
                                     expired: Boolean)



{code}
The final output I need is this

  !微信图片_20190906170459.png!

  was:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Multiple 
mapGroupsWithStates are not supported on a streaming DataFrames/Datasets;;

 

How do you carry out a double digit operation?The first processing state and 
the second summarizing the processed state data?Is there any good way not to 
sink->kafka, kafka source->spark structured like this?thank you
{code:java}
//代码占位符
package org.roy.demo.streaming.bus

import java.sql.Timestamp
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.current_timestamp
import org.apache.spark.sql.streaming._
import streaming.StreamingExamples

object StructuredOrderStateListRturn {
  Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
  StreamingExamples.setStreamingLogLevels()

  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder.master("local[*]")
      .appName("StructuredSessionization")
      .getOrCreate()
    import spark.implicits._
    // Create DataFrame representing the stream of input lines from connection 
to host:port
    val lines = spark.readStream
      .format("socket")
      .option("host", "10.200.102.192")
      .option("port", 9998)
      .load().withColumn("current_timestamp", current_timestamp)
    //100,1,10,20,2019-09-03
    //1001,1,10,200,2019-09-03
    //1001,1,10,2000,2019-09-03
    val events = lines
      .as[(String, Timestamp)]
      .map { case (line, timestamp) => {
        val orderInfo = line.split(",")
        if (orderInfo != null && orderInfo.size > 4) {
          val objEvent = NOEvent(orderInfo(0), orderInfo(1).toInt, 
orderInfo(2), orderInfo(3).toDouble, orderInfo(4), timestamp)
          objEvent
        } else {
          null
        }
      }
      }.filter(obj => obj != null)
    /**
      * -次维护订单状态数据,返回最新的一个订单,并把上一个订单金额附加返回
      */
    val orderUpdates = events
      .groupByKey(event => event.orderId)
      //orderInfoStore=输入的状态类型,orderInfoStoreUpdate=输出的状态类型
      .mapGroupsWithState[orderInfoStore, 
orderInfoStoreUpdate](GroupStateTimeout.ProcessingTimeTimeout) {
      case (orderId: String, events: Iterator[NOEvent], state: 
GroupState[orderInfoStore]) =>
        // 如果时间超时,更新缓存
        if (state.hasTimedOut) {
          //时间过了,删除sotre里的数据,把expired=true,返回在线时间等信息 0:00-0:10可以设这个时间,不计算,干掉数据
          val finalUpdate =
            orderInfoStoreUpdate(orderId, state.get.otype, state.get.storeId, 
state.get.money, 0.0, state.get.orderDate, state.get.timestamp, expired = true)
          state.remove()
          finalUpdate
        } else {
          //订单没有超时,如果id存在,则替换掉,使用新的订单数据,或作别的操作
          var oldOrder = 0.0 //上一笔的金额
          val lastEnvent = events.toSeq.last
          val updatedSession = if (state.exists) {
            oldOrder = state.get.money
            //存在,算出旧的金额是多少
            orderInfoStore(orderId, lastEnvent.otype, lastEnvent.storeId, 
lastEnvent.money, oldOrder, lastEnvent.orderDate, lastEnvent.timestamp)
          } else {
            orderInfoStore(orderId, lastEnvent.otype, lastEnvent.storeId, 
lastEnvent.money, 0, lastEnvent.orderDate, lastEnvent.timestamp)
          }
          //更新缓存里面的这条数据信息
          state.update(updatedSession)
          // Set timeout such that the session will be expired if no data 
received for 10 seconds
          state.setTimeoutDuration("3600 seconds")
          orderInfoStoreUpdate(orderId, state.get.otype, state.get.storeId, 
state.get.money, oldOrder, state.get.orderDate, state.get.timestamp, expired = 
false)
        }
    }

    /**
      * 二次计算出门店的数据
      */
    val storeUpdate = orderUpdates.groupByKey(order => 
order.storeId).mapGroupsWithState[g1InfoStore, 
g1InfoStoreUpdate](GroupStateTimeout.ProcessingTimeTimeout) {
      case (storeId: String, events: Iterator[orderInfoStoreUpdate], state: 
GroupState[g1InfoStore]) =>
        // 如果时间超时,更新缓存
        if (state.hasTimedOut) {
          //时间过了,删除sotre里的数据,把expired=true,返回在线时间等信息 0:00-0:10可以设这个时间,不计算,干掉数据
          val finalUpdate =
            g1InfoStoreUpdate(storeId, state.get.num, state.get.money, 
state.get.timestamp, expired = true)
          state.remove()
          finalUpdate
        } else {
          var storeNum = events.map(_.orderId).size
          var storeMoney = events.map(_.money).reduce(_ + _) //其实只会一个
          val updatedStore = if (state.exists) { //门店存在,
            val old_order_moneys = events.map(_.oldMoney).reduce(_ + _) //其实只会一个
            //门店总客+=新订单金额-旧订单
            storeMoney = state.get.money + storeMoney - old_order_moneys
            g1InfoStore(storeId, state.get.num + events.map(_.orderId).size, 
storeMoney, state.get.timestamp)
          } else {
            g1InfoStore(storeId, storeNum, storeMoney, state.get.timestamp)
          }
          //更新缓存里面的这条数据信息
          state.update(updatedStore)
          // Set timeout such that the session will be expired if no data 
received for 10 seconds
          state.setTimeoutDuration("3600 seconds")
          g1InfoStoreUpdate(storeId, state.get.num, state.get.money, 
state.get.timestamp, expired = true)
        }
    }
    //门店统计好的数据在汇总
//    storeUpdate.createOrReplaceTempView("update_tmp")
//    spark.sql("select storeId,otype, count(1) num,sum(money) as moneys 
,sum(oldMoney) as oldMoney  from update_tmp group by storeId,otype ")
    val query = storeUpdate
      .writeStream
      .outputMode("update")
      .format("console")
      .start()
    query.awaitTermination()
  }
}

/** User-defined data type representing the input events */
case class NOEvent(orderId: String, otype: Int, storeId: String, money: Double, 
orderDate: String, timestamp: Timestamp)

case class orderInfoStore(orderId: String, otype: Int, storeId: String, money: 
Double, oldMoney: Double, orderDate: String, timestamp: Timestamp)

case class orderInfoStoreUpdate(orderId: String, otype: Int, storeId: String, 
money: Double, oldMoney: Double, orderDate: String, timestamp: Timestamp,
                                expired: Boolean)

/** 第一个分组信息 */
case class g1InfoStore(storeid: String,/* otype: Int,*/ num: Int, money: 
Double, timestamp: Timestamp)

case class g1InfoStoreUpdate(storeid: String,/* otype: Int,*/ num: Int, money: 
Double, timestamp: Timestamp, expired: Boolean)


{code}
 

 

 

I changed the way to maintain a list of orders in the store, and then to 
maintain the list and re-operate the calculation. However, in this way, I will 
maintain the details of the first-line data, and I will temporarily use many 
resources. I wonder if there is any other way to deal with this kind of data.
{code:java}
package org.roy.demo.streaming.bus

import java.sql.Timestamp
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.current_timestamp
import org.apache.spark.sql.streaming._
import streaming.StreamingExamples

/**
  * create by Roy 2019/09/06
  * Counting day order number and amount
  */
object StructuredStoreOrderState {
  Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
  StreamingExamples.setStreamingLogLevels()

  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder.master("local[*]")
      .appName("StructuredSessionization")
      .getOrCreate()
    import spark.implicits._
    // Create DataFrame representing the stream of input lines from connection 
to host:port
    val lines = spark.readStream
      .format("socket")
      .option("host", "10.200.102.192")
      .option("port", 9998)
      .load().withColumn("current_timestamp", current_timestamp)
    //100,1,10,20,2019-09-03
    //1001,1,10,200,2019-09-03
    //1001,1,10,2000,2019-09-03
    val events = lines
      .as[(String, Timestamp)]
      .map { case (line, timestamp) => {
        val orderInfo = line.split(",")
        if (orderInfo != null && orderInfo.size > 4) {
          val objEvent = dataEvent(orderInfo(0), orderInfo(1).toInt, 
orderInfo(2), orderInfo(3).toDouble, orderInfo(4), timestamp)
          objEvent
        } else {
          null
        }
      }
      }.filter(obj => obj != null)
    val orderUpdates = events
      .groupByKey(event => event.storeId)
      .mapGroupsWithState[storeOrderInfoState, 
storeOrderInfoStateUpdate](GroupStateTimeout.ProcessingTimeTimeout) {
      case (key: String, values: Iterator[dataEvent], state: 
GroupState[storeOrderInfoState]) =>
        val seqs = values.toSeq
        val times = seqs.map(_.timestamp).seq
        val max_time = new Timestamp(System.currentTimeMillis())
        if (state.hasTimedOut) {
          val finalUpdate =
            storeOrderInfoStateUpdate(key, state.get.orderNum, 
state.get.orderMoney, max_time, expired = true)
          state.remove()
          finalUpdate
        } else {
          val updatedSession = if (state.exists) {
            val stateMap = state.get.orderInfoStoreMap
            var norderMap2: Map[String, Double] = Map()
            var num = 0
            var money = 0.0
            seqs.foreach(e => {
              if (stateMap.contains(e.orderId)) {
                //新订单-旧订单,再进行+总合
                money += e.money - stateMap.get(e.orderId).get
              } else {
                num += 1
                money += e.money
              }
              norderMap2 += (e.orderId -> e.money)
            })
            //取出所有的订单+流进来的订单,需要判断是否有重复订单
            storeOrderInfoState(key, state.get.orderNum + num, 
state.get.orderMoney + money, stateMap ++ norderMap2, max_time)
          } else {
            var norderMap2: Map[String, Double] = Map()
            var money = 0.0
            seqs.foreach(e => {
              money += e.money
              norderMap2 += (e.orderId -> e.money)
            })
            storeOrderInfoState(key, norderMap2.size, money, norderMap2, 
max_time)
          }
          //更新缓存里面的这条数据信息
          println("updatedSession" + updatedSession)
          println(updatedSession.orderMoney)
          state.update(updatedSession)
          // Set timeout such that the session will be expired if no data 
received for 10 seconds
          state.setTimeoutDuration("3600 seconds")
          storeOrderInfoStateUpdate(key, state.get.orderNum, 
state.get.orderMoney, max_time, expired = false)
        }
    }


    val query = orderUpdates
      .writeStream
      .outputMode("update")
      .format("console")
      .start()
    query.awaitTermination()
  }
}

/** User-defined data type representing the input events */
case class dataEvent(orderId: String, otype: Int, storeId: String, money: 
Double, orderTime: String, timestamp: Timestamp)

//最一个订单状态
case class orderEnventInfo(orderId: String, otype: Int, storeId: String, money: 
Double, orderTime: String, timestamp: Timestamp)

//门店里面,维护一张所有订单,sotre  orderInfoStoreMap: Map[String, orderEnventInfo]  
Seq[orderEnventInfo]
case class storeOrderInfoState(storeId: String, orderNum: Int, orderMoney: 
Double, orderInfoStoreMap: Map[String, Double], timestamp: Timestamp)

//返回计算后的结果
case class storeOrderInfoStateUpdate(storeId: String, orderNum: Int, 
orderMoney: Double, timestamp: Timestamp,
                                     expired: Boolean)



{code}
The final output I need is this

 


> Exception in thread "main" org.apache.spark.sql.AnalysisException: Multiple 
> mapGroupsWithStates are not supported on a streaming DataFrames/Datasets;;
> ------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-28999
>                 URL: https://issues.apache.org/jira/browse/SPARK-28999
>             Project: Spark
>          Issue Type: Question
>          Components: Spark Core
>    Affects Versions: 2.3.0
>            Reporter: ruiliang
>            Priority: Major
>         Attachments: 微信图片_20190906170459.png
>
>
> Exception in thread "main" org.apache.spark.sql.AnalysisException: Multiple 
> mapGroupsWithStates are not supported on a streaming DataFrames/Datasets;;
>  
> How do you carry out a double digit operation?The first processing state and 
> the second summarizing the processed state data?Is there any good way not to 
> sink->kafka, kafka source->spark structured like this?thank you
> {code:java}
> //代码占位符
> package org.roy.demo.streaming.bus
> import java.sql.Timestamp
> import org.apache.log4j.{Level, Logger}
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.functions.current_timestamp
> import org.apache.spark.sql.streaming._
> import streaming.StreamingExamples
> object StructuredOrderStateListRturn {
>   Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
>   StreamingExamples.setStreamingLogLevels()
>   def main(args: Array[String]): Unit = {
>     val spark = SparkSession
>       .builder.master("local[*]")
>       .appName("StructuredSessionization")
>       .getOrCreate()
>     import spark.implicits._
>     // Create DataFrame representing the stream of input lines from 
> connection to host:port
>     val lines = spark.readStream
>       .format("socket")
>       .option("host", "10.200.102.192")
>       .option("port", 9998)
>       .load().withColumn("current_timestamp", current_timestamp)
>     //100,1,10,20,2019-09-03
>     //1001,1,10,200,2019-09-03
>     //1001,1,10,2000,2019-09-03
>     val events = lines
>       .as[(String, Timestamp)]
>       .map { case (line, timestamp) => {
>         val orderInfo = line.split(",")
>         if (orderInfo != null && orderInfo.size > 4) {
>           val objEvent = NOEvent(orderInfo(0), orderInfo(1).toInt, 
> orderInfo(2), orderInfo(3).toDouble, orderInfo(4), timestamp)
>           objEvent
>         } else {
>           null
>         }
>       }
>       }.filter(obj => obj != null)
>     /**
>       * -次维护订单状态数据,返回最新的一个订单,并把上一个订单金额附加返回
>       */
>     val orderUpdates = events
>       .groupByKey(event => event.orderId)
>       //orderInfoStore=输入的状态类型,orderInfoStoreUpdate=输出的状态类型
>       .mapGroupsWithState[orderInfoStore, 
> orderInfoStoreUpdate](GroupStateTimeout.ProcessingTimeTimeout) {
>       case (orderId: String, events: Iterator[NOEvent], state: 
> GroupState[orderInfoStore]) =>
>         // 如果时间超时,更新缓存
>         if (state.hasTimedOut) {
>           //时间过了,删除sotre里的数据,把expired=true,返回在线时间等信息 0:00-0:10可以设这个时间,不计算,干掉数据
>           val finalUpdate =
>             orderInfoStoreUpdate(orderId, state.get.otype, state.get.storeId, 
> state.get.money, 0.0, state.get.orderDate, state.get.timestamp, expired = 
> true)
>           state.remove()
>           finalUpdate
>         } else {
>           //订单没有超时,如果id存在,则替换掉,使用新的订单数据,或作别的操作
>           var oldOrder = 0.0 //上一笔的金额
>           val lastEnvent = events.toSeq.last
>           val updatedSession = if (state.exists) {
>             oldOrder = state.get.money
>             //存在,算出旧的金额是多少
>             orderInfoStore(orderId, lastEnvent.otype, lastEnvent.storeId, 
> lastEnvent.money, oldOrder, lastEnvent.orderDate, lastEnvent.timestamp)
>           } else {
>             orderInfoStore(orderId, lastEnvent.otype, lastEnvent.storeId, 
> lastEnvent.money, 0, lastEnvent.orderDate, lastEnvent.timestamp)
>           }
>           //更新缓存里面的这条数据信息
>           state.update(updatedSession)
>           // Set timeout such that the session will be expired if no data 
> received for 10 seconds
>           state.setTimeoutDuration("3600 seconds")
>           orderInfoStoreUpdate(orderId, state.get.otype, state.get.storeId, 
> state.get.money, oldOrder, state.get.orderDate, state.get.timestamp, expired 
> = false)
>         }
>     }
>     /**
>       * 二次计算出门店的数据
>       */
>     val storeUpdate = orderUpdates.groupByKey(order => 
> order.storeId).mapGroupsWithState[g1InfoStore, 
> g1InfoStoreUpdate](GroupStateTimeout.ProcessingTimeTimeout) {
>       case (storeId: String, events: Iterator[orderInfoStoreUpdate], state: 
> GroupState[g1InfoStore]) =>
>         // 如果时间超时,更新缓存
>         if (state.hasTimedOut) {
>           //时间过了,删除sotre里的数据,把expired=true,返回在线时间等信息 0:00-0:10可以设这个时间,不计算,干掉数据
>           val finalUpdate =
>             g1InfoStoreUpdate(storeId, state.get.num, state.get.money, 
> state.get.timestamp, expired = true)
>           state.remove()
>           finalUpdate
>         } else {
>           var storeNum = events.map(_.orderId).size
>           var storeMoney = events.map(_.money).reduce(_ + _) //其实只会一个
>           val updatedStore = if (state.exists) { //门店存在,
>             val old_order_moneys = events.map(_.oldMoney).reduce(_ + _) 
> //其实只会一个
>             //门店总客+=新订单金额-旧订单
>             storeMoney = state.get.money + storeMoney - old_order_moneys
>             g1InfoStore(storeId, state.get.num + events.map(_.orderId).size, 
> storeMoney, state.get.timestamp)
>           } else {
>             g1InfoStore(storeId, storeNum, storeMoney, state.get.timestamp)
>           }
>           //更新缓存里面的这条数据信息
>           state.update(updatedStore)
>           // Set timeout such that the session will be expired if no data 
> received for 10 seconds
>           state.setTimeoutDuration("3600 seconds")
>           g1InfoStoreUpdate(storeId, state.get.num, state.get.money, 
> state.get.timestamp, expired = true)
>         }
>     }
>     //门店统计好的数据在汇总
> //    storeUpdate.createOrReplaceTempView("update_tmp")
> //    spark.sql("select storeId,otype, count(1) num,sum(money) as moneys 
> ,sum(oldMoney) as oldMoney  from update_tmp group by storeId,otype ")
>     val query = storeUpdate
>       .writeStream
>       .outputMode("update")
>       .format("console")
>       .start()
>     query.awaitTermination()
>   }
> }
> /** User-defined data type representing the input events */
> case class NOEvent(orderId: String, otype: Int, storeId: String, money: 
> Double, orderDate: String, timestamp: Timestamp)
> case class orderInfoStore(orderId: String, otype: Int, storeId: String, 
> money: Double, oldMoney: Double, orderDate: String, timestamp: Timestamp)
> case class orderInfoStoreUpdate(orderId: String, otype: Int, storeId: String, 
> money: Double, oldMoney: Double, orderDate: String, timestamp: Timestamp,
>                                 expired: Boolean)
> /** 第一个分组信息 */
> case class g1InfoStore(storeid: String,/* otype: Int,*/ num: Int, money: 
> Double, timestamp: Timestamp)
> case class g1InfoStoreUpdate(storeid: String,/* otype: Int,*/ num: Int, 
> money: Double, timestamp: Timestamp, expired: Boolean)
> {code}
>  
>  
>  
> I changed the way to maintain a list of orders in the store, and then to 
> maintain the list and re-operate the calculation. However, in this way, I 
> will maintain the details of the first-line data, and I will temporarily use 
> many resources. I wonder if there is any other way to deal with this kind of 
> data.
> {code:java}
> package org.roy.demo.streaming.bus
> import java.sql.Timestamp
> import org.apache.log4j.{Level, Logger}
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.functions.current_timestamp
> import org.apache.spark.sql.streaming._
> import streaming.StreamingExamples
> /**
>   * create by Roy 2019/09/06
>   * Counting day order number and amount
>   */
> object StructuredStoreOrderState {
>   Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
>   StreamingExamples.setStreamingLogLevels()
>   def main(args: Array[String]): Unit = {
>     val spark = SparkSession
>       .builder.master("local[*]")
>       .appName("StructuredSessionization")
>       .getOrCreate()
>     import spark.implicits._
>     // Create DataFrame representing the stream of input lines from 
> connection to host:port
>     val lines = spark.readStream
>       .format("socket")
>       .option("host", "10.200.102.192")
>       .option("port", 9998)
>       .load().withColumn("current_timestamp", current_timestamp)
>     //100,1,10,20,2019-09-03
>     //1001,1,10,200,2019-09-03
>     //1001,1,10,2000,2019-09-03
>     val events = lines
>       .as[(String, Timestamp)]
>       .map { case (line, timestamp) => {
>         val orderInfo = line.split(",")
>         if (orderInfo != null && orderInfo.size > 4) {
>           val objEvent = dataEvent(orderInfo(0), orderInfo(1).toInt, 
> orderInfo(2), orderInfo(3).toDouble, orderInfo(4), timestamp)
>           objEvent
>         } else {
>           null
>         }
>       }
>       }.filter(obj => obj != null)
>     val orderUpdates = events
>       .groupByKey(event => event.storeId)
>       .mapGroupsWithState[storeOrderInfoState, 
> storeOrderInfoStateUpdate](GroupStateTimeout.ProcessingTimeTimeout) {
>       case (key: String, values: Iterator[dataEvent], state: 
> GroupState[storeOrderInfoState]) =>
>         val seqs = values.toSeq
>         val times = seqs.map(_.timestamp).seq
>         val max_time = new Timestamp(System.currentTimeMillis())
>         if (state.hasTimedOut) {
>           val finalUpdate =
>             storeOrderInfoStateUpdate(key, state.get.orderNum, 
> state.get.orderMoney, max_time, expired = true)
>           state.remove()
>           finalUpdate
>         } else {
>           val updatedSession = if (state.exists) {
>             val stateMap = state.get.orderInfoStoreMap
>             var norderMap2: Map[String, Double] = Map()
>             var num = 0
>             var money = 0.0
>             seqs.foreach(e => {
>               if (stateMap.contains(e.orderId)) {
>                 //新订单-旧订单,再进行+总合
>                 money += e.money - stateMap.get(e.orderId).get
>               } else {
>                 num += 1
>                 money += e.money
>               }
>               norderMap2 += (e.orderId -> e.money)
>             })
>             //取出所有的订单+流进来的订单,需要判断是否有重复订单
>             storeOrderInfoState(key, state.get.orderNum + num, 
> state.get.orderMoney + money, stateMap ++ norderMap2, max_time)
>           } else {
>             var norderMap2: Map[String, Double] = Map()
>             var money = 0.0
>             seqs.foreach(e => {
>               money += e.money
>               norderMap2 += (e.orderId -> e.money)
>             })
>             storeOrderInfoState(key, norderMap2.size, money, norderMap2, 
> max_time)
>           }
>           //更新缓存里面的这条数据信息
>           println("updatedSession" + updatedSession)
>           println(updatedSession.orderMoney)
>           state.update(updatedSession)
>           // Set timeout such that the session will be expired if no data 
> received for 10 seconds
>           state.setTimeoutDuration("3600 seconds")
>           storeOrderInfoStateUpdate(key, state.get.orderNum, 
> state.get.orderMoney, max_time, expired = false)
>         }
>     }
>     val query = orderUpdates
>       .writeStream
>       .outputMode("update")
>       .format("console")
>       .start()
>     query.awaitTermination()
>   }
> }
> /** User-defined data type representing the input events */
> case class dataEvent(orderId: String, otype: Int, storeId: String, money: 
> Double, orderTime: String, timestamp: Timestamp)
> //最一个订单状态
> case class orderEnventInfo(orderId: String, otype: Int, storeId: String, 
> money: Double, orderTime: String, timestamp: Timestamp)
> //门店里面,维护一张所有订单,sotre  orderInfoStoreMap: Map[String, orderEnventInfo]  
> Seq[orderEnventInfo]
> case class storeOrderInfoState(storeId: String, orderNum: Int, orderMoney: 
> Double, orderInfoStoreMap: Map[String, Double], timestamp: Timestamp)
> //返回计算后的结果
> case class storeOrderInfoStateUpdate(storeId: String, orderNum: Int, 
> orderMoney: Double, timestamp: Timestamp,
>                                      expired: Boolean)
> {code}
> The final output I need is this
>   !微信图片_20190906170459.png!



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to