add s2graph sink for spark structured streaming

Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/b14b293d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/b14b293d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/b14b293d

Branch: refs/heads/master
Commit: b14b293d478571dbca7936c64adbe119e564fc43
Parents: e0ba0aa
Author: Chul Kang <[email protected]>
Authored: Thu Mar 22 18:53:18 2018 +0900
Committer: Chul Kang <[email protected]>
Committed: Thu Mar 29 12:02:57 2018 +0900

----------------------------------------------------------------------
 .../org/apache/s2graph/s2jobs/task/Sink.scala   |  13 +++
 .../s2graph/spark/sql/streaming/Logger.scala    |   7 ++
 .../spark/sql/streaming/S2CommitProtocol.scala  |  48 ++++++++
 .../spark/sql/streaming/S2SinkConfigs.scala     |  28 +++++
 .../spark/sql/streaming/S2SinkContext.scala     |  25 ++++
 .../spark/sql/streaming/S2SinkMetadataLog.scala |  25 ++++
 .../spark/sql/streaming/S2SinkProvider.scala    |  26 +++++
 .../spark/sql/streaming/S2SinkStatus.scala      |   9 ++
 .../sql/streaming/S2SparkSqlStreamingSink.scala |  71 ++++++++++++
 .../sql/streaming/S2StreamQueryWriter.scala     | 114 +++++++++++++++++++
 10 files changed, 366 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b14b293d/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
----------------------------------------------------------------------
diff --git a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
index 6249891..0b83a6a 100644
--- a/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
+++ b/s2jobs/src/main/scala/org/apache/s2graph/s2jobs/task/Sink.scala
@@ -163,3 +163,16 @@ class ESSink(queryName:String, conf:TaskConf) extends 
Sink(queryName, conf) {
   }
 }
 
+/**
+  * S2graphSink
+  * @param queryName
+  * @param conf
+  */
+class S2graphSink(queryName:String, conf:TaskConf) extends Sink(queryName, 
conf) {
+  override def mandatoryOptions: Set[String] = Set()
+  override val FORMAT: String = 
"org.apache.s2graph.spark.sql.streaming.S2SinkProvider"
+
+  override protected def writeBatch(writer: DataFrameWriter[Row]): Unit =
+    throw new RuntimeException(s"unsupported source type for 
${this.getClass.getSimpleName} : ${conf.name}")
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b14b293d/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/Logger.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/Logger.scala 
b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/Logger.scala
new file mode 100644
index 0000000..06232c5
--- /dev/null
+++ b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/Logger.scala
@@ -0,0 +1,7 @@
+package org.apache.s2graph.spark.sql.streaming
+
+import org.apache.commons.logging.{Log, LogFactory}
+
+trait Logger {
+  @transient lazy val logger: Log = LogFactory.getLog(this.getClass)
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b14b293d/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2CommitProtocol.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2CommitProtocol.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2CommitProtocol.scala
new file mode 100644
index 0000000..162b9ae
--- /dev/null
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2CommitProtocol.scala
@@ -0,0 +1,48 @@
+package org.apache.s2graph.spark.sql.streaming
+
+import org.apache.spark.sql.execution.streaming.MetadataLog
+
+class S2CommitProtocol(@transient val commitLog: 
MetadataLog[Array[S2SinkStatus]])
+  extends Serializable with Logger {
+
+  def initJob(jobState: JobState): Unit = {
+    logger.debug(s"[InitJob] ${jobState}")
+  }
+
+  def commitJob(jobState: JobState, taskCommits: Seq[TaskCommit]): Unit = {
+    val commits = taskCommits.flatMap(_.statuses).toArray[S2SinkStatus]
+    if (commitLog.add(jobState.batchId, commits)) {
+      logger.debug(s"[Committed batch] ${jobState.batchId}  (${taskCommits})")
+    } else {
+      throw new IllegalStateException(s"Batch Id [${jobState.batchId}] is 
already committed")
+    }
+  }
+
+  def abortJob(jobState: JobState): Unit = {
+    logger.info(s"[AbortJob] ${jobState}")
+  }
+
+  @transient var executionStart: Long = _
+
+  def initTask(): Unit = {
+    executionStart = System.currentTimeMillis()
+  }
+
+  def commitTask(taskState: TaskState): TaskCommit = {
+    TaskCommit(Some(S2SinkStatus(taskState.taskId, executionStart, 
taskState.successCnt, taskState.failCnt, taskState.counter)))
+  }
+
+  def abortTask(taskState: TaskState): Unit = {
+    logger.info(s"[AbortTask] ${taskState}")
+    TaskCommit(None)
+  }
+}
+
+case class JobState(jobId: String, batchId: Long, dataCnt: Long = -1L)
+case class TaskState(
+                      taskId: Int,
+                      successCnt:Long = -1L,
+                      failCnt:Long = -1L,
+                      counter:Map[String, Int] = Map.empty
+                    )
+case class TaskCommit(statuses: Option[S2SinkStatus])

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b14b293d/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkConfigs.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkConfigs.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkConfigs.scala
new file mode 100644
index 0000000..c24df5c
--- /dev/null
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkConfigs.scala
@@ -0,0 +1,28 @@
+package org.apache.s2graph.spark.sql.streaming
+
+import com.typesafe.config.Config
+
+import scala.util.Try
+
+object S2SinkConfigs {
+  val DB_DEFAULT_URL = "db.default.url"
+  val HBASE_ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum"
+
+
+  val DEFAULT_GROUPED_SIZE = "100"
+  val DEFAULT_WAIT_TIME_SECONDS = "5"
+
+  val S2_SINK_PREFIX = "s2.spark.sql.streaming.sink"
+  val S2_SINK_QUERY_NAME = s"$S2_SINK_PREFIX.queryname"
+  val S2_SINK_LOG_PATH = s"$S2_SINK_PREFIX.logpath"
+  val S2_SINK_CHECKPOINT_LOCATION = "checkpointlocation"
+  val S2_SINK_FILE_CLEANUP_DELAY = s"$S2_SINK_PREFIX.file.cleanup.delay"
+  val S2_SINK_DELETE_EXPIRED_LOG = s"$S2_SINK_PREFIX.delete.expired.log"
+  val S2_SINK_COMPACT_INTERVAL = s"$S2_SINK_PREFIX.compact.interval"
+  val S2_SINK_GROUPED_SIZE = s"$S2_SINK_PREFIX.grouped.size"
+  val S2_SINK_WAIT_TIME = s"$S2_SINK_PREFIX.wait.time"
+
+  def getConfigStringOpt(config:Config, path:String): Option[String] = 
Try(config.getString(path)).toOption
+
+  def getConfigString(config:Config, path:String, default:String): String = 
getConfigStringOpt(config, path).getOrElse(default)
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b14b293d/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkContext.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkContext.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkContext.scala
new file mode 100644
index 0000000..74b3308
--- /dev/null
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkContext.scala
@@ -0,0 +1,25 @@
+package org.apache.s2graph.spark.sql.streaming
+
+import com.typesafe.config.Config
+import org.apache.s2graph.core.S2Graph
+
+import scala.concurrent.ExecutionContext
+
+class S2SinkContext(config: Config)(implicit ec: ExecutionContext = 
ExecutionContext.Implicits.global){
+  println(s">>>> S2SinkContext Created...")
+  private lazy val s2Graph = new S2Graph(config)
+  def getGraph: S2Graph = {
+    s2Graph
+  }
+}
+
+object S2SinkContext {
+  private var s2SinkContext:S2SinkContext = null
+
+  def apply(config:Config):S2SinkContext = {
+    if (s2SinkContext == null) {
+      s2SinkContext = new S2SinkContext(config)
+    }
+    s2SinkContext
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b14b293d/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkMetadataLog.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkMetadataLog.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkMetadataLog.scala
new file mode 100644
index 0000000..eb985f1
--- /dev/null
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkMetadataLog.scala
@@ -0,0 +1,25 @@
+package org.apache.s2graph.spark.sql.streaming
+
+import com.typesafe.config.Config
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog
+
+class S2SinkMetadataLog(sparkSession: SparkSession, config:Config, 
logPath:String)
+  extends CompactibleFileStreamLog[S2SinkStatus](S2SinkMetadataLog.VERSION, 
sparkSession, logPath) {
+  import S2SinkConfigs._
+
+  override protected def fileCleanupDelayMs: Long = getConfigStringOpt(config, 
S2_SINK_FILE_CLEANUP_DELAY)
+                                                      .getOrElse("60").toLong
+
+  override protected def isDeletingExpiredLog: Boolean = 
getConfigStringOpt(config, S2_SINK_DELETE_EXPIRED_LOG)
+                                                            
.getOrElse("false").toBoolean
+
+  override protected def defaultCompactInterval: Int = 
getConfigStringOpt(config, S2_SINK_COMPACT_INTERVAL)
+                                                          
.getOrElse("60").toInt
+
+  override def compactLogs(logs: Seq[S2SinkStatus]): Seq[S2SinkStatus] = logs
+}
+
+object S2SinkMetadataLog {
+  private val VERSION = 1
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b14b293d/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkProvider.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkProvider.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkProvider.scala
new file mode 100644
index 0000000..6198c81
--- /dev/null
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkProvider.scala
@@ -0,0 +1,26 @@
+package org.apache.s2graph.spark.sql.streaming
+
+import com.typesafe.config.{Config, ConfigFactory, ConfigRenderOptions}
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.execution.streaming.Sink
+import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
+import org.apache.spark.sql.streaming.OutputMode
+
+import scala.collection.JavaConversions._
+
+class S2SinkProvider extends StreamSinkProvider with DataSourceRegister with 
Logger {
+  override def createSink(
+                  sqlContext: SQLContext,
+                  parameters: Map[String, String],
+                  partitionColumns: Seq[String],
+                  outputMode: OutputMode): Sink = {
+
+    logger.info(s"S2SinkProvider options : ${parameters}")
+    val jobConf:Config = 
ConfigFactory.parseMap(parameters).withFallback(ConfigFactory.load())
+    logger.info(s"S2SinkProvider Configuration : 
${jobConf.root().render(ConfigRenderOptions.concise())}")
+
+    new S2SparkSqlStreamingSink(sqlContext.sparkSession, jobConf)
+  }
+
+  override def shortName(): String = "s2graph"
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b14b293d/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkStatus.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkStatus.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkStatus.scala
new file mode 100644
index 0000000..9b5ab8e
--- /dev/null
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SinkStatus.scala
@@ -0,0 +1,9 @@
+package org.apache.s2graph.spark.sql.streaming
+
+case class S2SinkStatus(
+                         taskId: Int,
+                         execTimeMillis: Long,
+                         successCnt:Long,
+                         failCnt:Long,
+                         counter:Map[String, Int]
+                       )

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b14b293d/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SparkSqlStreamingSink.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SparkSqlStreamingSink.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SparkSqlStreamingSink.scala
new file mode 100644
index 0000000..e705a7a
--- /dev/null
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2SparkSqlStreamingSink.scala
@@ -0,0 +1,71 @@
+package org.apache.s2graph.spark.sql.streaming
+
+import java.util.UUID
+
+import com.typesafe.config.{Config, ConfigRenderOptions}
+import org.apache.spark.TaskContext
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.sql.execution.streaming.{MetadataLog, Sink}
+import org.apache.spark.sql.{DataFrame, SparkSession}
+
+class S2SparkSqlStreamingSink(
+                               sparkSession: SparkSession,
+                               config:Config
+                             ) extends Sink with Logger {
+  import S2SinkConfigs._
+
+  private val APP_NAME = "s2graph"
+
+  private val writeLog: MetadataLog[Array[S2SinkStatus]] = {
+    val logPath = getCommitLogPath(config)
+    logger.info(s"MetaDataLogPath: $logPath")
+
+    new S2SinkMetadataLog(sparkSession, config, logPath)
+  }
+
+  override def addBatch(batchId: Long, data: DataFrame): Unit = {
+    val dataCnt = data.count()
+    logger.debug(s"addBatch : $batchId, getLatest : ${writeLog.getLatest()}, 
data cnt : ${dataCnt}")
+
+    if (batchId <= writeLog.getLatest().map(_._1).getOrElse(-1L)) {
+      logger.info(s"Skipping already committed batch [$batchId]")
+    } else {
+      val queryName = getConfigStringOpt(config, 
"queryname").getOrElse(UUID.randomUUID().toString)
+      val commitProtocol = new S2CommitProtocol(writeLog)
+      val jobState = JobState(queryName, batchId, dataCnt)
+      val serializedConfig = 
config.root().render(ConfigRenderOptions.concise())
+      val queryExecution = data.queryExecution
+      val schema = data.schema
+
+      SQLExecution.withNewExecutionId(sparkSession, queryExecution) {
+        try {
+          val taskCommits = 
sparkSession.sparkContext.runJob(queryExecution.toRdd,
+            (taskContext: TaskContext, iter: Iterator[InternalRow]) => {
+              new S2StreamQueryWriter(serializedConfig, schema, 
commitProtocol).run(taskContext, iter)
+            }
+          )
+          commitProtocol.commitJob(jobState, taskCommits)
+        } catch {
+          case t: Throwable =>
+            commitProtocol.abortJob(jobState)
+            throw t;
+        }
+
+      }
+    }
+  }
+
+  private def getCommitLogPath(config:Config): String = {
+    val logPathOpt = getConfigStringOpt(config, S2_SINK_LOG_PATH)
+    val userCheckpointLocationOpt = getConfigStringOpt(config, 
S2_SINK_CHECKPOINT_LOCATION)
+
+    (logPathOpt, userCheckpointLocationOpt) match {
+      case (Some(logPath), _) => logPath
+      case (None, Some(userCheckpoint)) => s"$userCheckpoint/sinks/$APP_NAME"
+      case _ => throw new IllegalArgumentException(s"failed to get commit log 
path")
+    }
+  }
+
+  override def toString(): String = "S2GraphSink"
+}

http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/b14b293d/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2StreamQueryWriter.scala
----------------------------------------------------------------------
diff --git 
a/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2StreamQueryWriter.scala
 
b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2StreamQueryWriter.scala
new file mode 100644
index 0000000..de5aea0
--- /dev/null
+++ 
b/s2jobs/src/main/scala/org/apache/s2graph/spark/sql/streaming/S2StreamQueryWriter.scala
@@ -0,0 +1,114 @@
+package org.apache.s2graph.spark.sql.streaming
+
+import com.typesafe.config.ConfigFactory
+import org.apache.s2graph.core.{GraphElement, JSONParser}
+import org.apache.s2graph.spark.sql.streaming.S2SinkConfigs._
+import org.apache.spark.TaskContext
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
+import org.apache.spark.sql.types.StructType
+import play.api.libs.json.{JsObject, Json}
+
+import scala.collection.mutable.ListBuffer
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+import scala.util.Try
+
+private [sql] class S2StreamQueryWriter(
+                                         serializedConf:String,
+                                         schema: StructType ,
+                                         commitProtocol: S2CommitProtocol
+                                       ) extends Serializable with Logger {
+  private val config = ConfigFactory.parseString(serializedConf)
+  private val s2SinkContext = S2SinkContext(config)
+  private val encoder: ExpressionEncoder[Row] = 
RowEncoder(schema).resolveAndBind()
+  private val RESERVED_COLUMN = Set("timestamp", "from", "to", "label", 
"operation", "elem", "direction")
+
+
+  def run(taskContext: TaskContext, iters: Iterator[InternalRow]): TaskCommit 
= {
+    val taskId = s"stage-${taskContext.stageId()}, 
partition-${taskContext.partitionId()}, attempt-${taskContext.taskAttemptId()}"
+    val partitionId= taskContext.partitionId()
+
+    val s2Graph = s2SinkContext.getGraph
+    val groupedSize = getConfigString(config, S2_SINK_GROUPED_SIZE, 
DEFAULT_GROUPED_SIZE).toInt
+    val waitTime = getConfigString(config, S2_SINK_WAIT_TIME, 
DEFAULT_WAIT_TIME_SECONDS).toInt
+
+    commitProtocol.initTask()
+    try {
+      var list = new ListBuffer[(String, Int)]()
+      val rst = iters.flatMap(rowToEdge).grouped(groupedSize).flatMap{ 
elements =>
+        logger.debug(s"[$taskId][elements] ${elements.size} (${elements.map(e 
=> e.toLogString).mkString(",\n")})")
+        elements.groupBy(_.serviceName).foreach{ case (service, elems) =>
+          list += ((service, elems.size))
+        }
+
+        val mutateF = s2Graph.mutateElements(elements, true)
+        Await.result(mutateF, Duration(waitTime, "seconds"))
+      }
+
+      val (success, fail) = rst.toSeq.partition(r => r.isSuccess)
+      val counter = list.groupBy(_._1).map{ case (service, t) =>
+        val sum = t.toList.map(_._2).sum
+        (service, sum)
+      }
+      logger.info(s"[$taskId] success : ${success.size}, fail : ${fail.size} 
($counter)")
+
+
+      commitProtocol.commitTask(TaskState(partitionId, success.size, 
fail.size, counter))
+
+    } catch {
+      case t: Throwable =>
+        commitProtocol.abortTask(TaskState(partitionId))
+        throw t
+    }
+  }
+
+  private def rowToEdge(internalRow:InternalRow): Option[GraphElement] = {
+    val s2Graph = s2SinkContext.getGraph
+    val row = encoder.fromRow(internalRow)
+
+    val timestamp = row.getAs[Long]("timestamp")
+    val operation = Try(row.getAs[String]("operation")).getOrElse("insert")
+    val elem = Try(row.getAs[String]("elem")).getOrElse("e")
+
+    val props: Map[String, Any] = Option(row.getAs[String]("props")) match {
+      case Some(propsStr:String) =>
+        JSONParser.fromJsonToProperties(Json.parse(propsStr).as[JsObject])
+      case None =>
+        schema.fieldNames.flatMap { field =>
+          if (!RESERVED_COLUMN.contains(field)) {
+            Seq(
+              field -> getRowValAny(row, field)
+            )
+          } else Nil
+        }.toMap
+    }
+
+    elem match {
+      case "e" | "edge" =>
+        val from = getRowValAny(row, "from")
+        val to = getRowValAny(row, "to")
+        val label = row.getAs[String]("label")
+        val direction = Try(row.getAs[String]("direction")).getOrElse("out")
+        Some(
+          s2Graph.elementBuilder.toEdge(from, to, label, direction, props, 
timestamp, operation)
+        )
+      case "v" | "vertex" =>
+        val id = getRowValAny(row, "id")
+        val serviceName = row.getAs[String]("service")
+        val columnName = row.getAs[String]("column")
+        Some(
+          s2Graph.elementBuilder.toVertex(serviceName, columnName, id, props, 
timestamp, operation)
+        )
+      case _ =>
+        logger.warn(s"'$elem' is not GraphElement. skipped!! 
(${row.toString()})")
+        None
+    }
+  }
+
+  private def getRowValAny(row:Row, fieldName:String):Any = {
+    val idx = row.fieldIndex(fieldName)
+    row.get(idx)
+  }
+}

Reply via email to