import akka.persistence._
import com.util.{CachedFTPClient}
import scala.concurrent.duration.{Duration, SECONDS}
import scala.util.{Try, Success, Failure }
import com.typesafe.config.Config
import com.common.ARCMessages._
import com.common.ARCProtocol._
import org.apache.commons.net.ftp.FTPClient
import akka.actor.{Actor, ActorRef, Props, ActorLogging}
import scala.concurrent.duration._

object IOWorker {
    sealed trait IOWorkerTestMsg
    case class SAVED(msg: Any) extends IOWorkerTestMsg    
    case class SUCCESS(msg: Any) extends IOWorkerTestMsg
    case object killMe extends IOWorkerTestMsg //when this message is sent it will simulate a power failure
    //scenario. Which means the IOWorker shall fail with out a snapshot. Next time when the IOWorker
    //comes back alive it should replay the same message.
  def apply(config: Config, ref: ActorRef = null) = {
      Props(classOf[IOWorker], config, ref)
  }
}
class IOWorker(config: Config, ref: ActorRef) 
    extends Actor with ActorLogging with Processor {
  import IOWorker._
  val channelName ="channel" + config.hashCode()
  val channel = context.actorOf(Channel.props(
      ChannelSettings(redeliverInterval = 10.seconds)),channelName)  
  val doIOActor = context.actorOf(DOIOActor(config), name = "doIOActor")
  def receive = {
      case p @ Persistent(payload, sequenceNr) =>
        channel ! Deliver(p.withPayload(payload), doIOActor.path)
      case msg : IOWorkerTestMsg =>
        msg match {
          case x @ SAVED(msg) => ref ! x
          case x @ SUCCESS(msg) => ref ! x
          case x @ killMe => context.stop(self)
        }
      case x => log.info(s"$x and sender is $sender")
  }
}

object DOIOActor {  
  def apply(config: Config) = Props(classOf[DOIOActor], config)
}
class DOIOActor(config: Config) extends Actor 
    with ActorLogging {
  val host              = config.getString("host")
  val userName          = config.getString("username")
  val password          = config.getString("password")
  val cachedFtpClient   = new CachedFTPClient(10, host, userName, password)
  import IOWorker._
  def receive = {    
    case p @ ConfirmablePersistent(payload, sequenceNr, redeliveries) =>
      payload match {
          case (msg, deadline: Deadline) => 
            log.debug(s"$msg \n timeleft ${deadline.hasTimeLeft} and trying $redeliveries times.")
            deadline.hasTimeLeft match {
              case false => 
                p.confirm()
              case true =>
                val file: AISData = msg match {
                  case work : IOWork => work.data
                  case _ => GNMFile("DUMMY","DUMMY", Array.emptyByteArray, 1)
                  //May be we can throw an exception instead.
                }
                sender ! SAVED(msg)
                val destination = config.getString(s"channel${file.channel}Path")
                Try(cachedFtpClient{client => DOIO(client, destination,file)})
                match 
                {
                  case Success(v) =>
                    sender ! SUCCESS(msg)
                    p.confirm()
                  case Failure(doioException) => 
                    log.warning(s"Could not complete DOIO. $doioException")
                }
            }
      }
          
  }
  def DOIO(ftpClient: FTPClient, destination: String, file: AISData) = {
    val payload = new java.io.ByteArrayInputStream(file.payload)    
    ftpClient.storeFile(destination+ "/" + file.filename, payload) match {
      case true => log.info(s"Storing file to $destination for file ${file.filename}")
      case false => 
        throw new Exception(s"Could not DOIO to destination $destination for file ${file.filename} ${ftpClient.getReplyString()}")
    }
  }

  }

