waitinfuture commented on code in PR #7360:
URL: https://github.com/apache/incubator-gluten/pull/7360#discussion_r1799517460


##########
backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialProjectExec.scala:
##########
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.GlutenConfig
+import org.apache.gluten.columnarbatch.ColumnarBatches
+import org.apache.gluten.expression.ExpressionUtils
+import org.apache.gluten.extension.{GlutenPlan, ValidationResult}
+import org.apache.gluten.extension.columnar.validator.Validator.Passed
+import org.apache.gluten.iterator.Iterators
+import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
+import org.apache.gluten.sql.shims.SparkShimLoader
+import org.apache.gluten.vectorized.ArrowWritableColumnVector
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
AttributeReference, CaseWhen, Coalesce, Expression, If, LambdaFunction, 
MutableProjection, NamedExpression, NaNvl, ScalaUDF, UnsafeProjection}
+import org.apache.spark.sql.execution.{ExplainUtils, ProjectExec, SparkPlan, 
UnaryExecNode}
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+import org.apache.spark.sql.execution.vectorized.{MutableColumnarRow, 
WritableColumnVector}
+import org.apache.spark.sql.hive.HiveUdfUtil
+import org.apache.spark.sql.types.{BinaryType, BooleanType, ByteType, 
DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, 
NullType, ShortType, StringType, TimestampType, YearMonthIntervalType}
+import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
+
+import scala.collection.mutable.ListBuffer
+
+/**
+ * By rule <PartialProhectRule>, the project not offoload-able that is changed 
to
+ * ProjectExecTransformer + ColumnarPartialProjectExec e.g. sum(myudf(a) + b + 
hash(c)), child is
+ * (a, b, c) ColumnarPartialProjectExec (a, b, c, myudf(a)), 
ProjectExecTransformer(myudf(a) + b +
+ * hash(c))
+ *
+ * @param original
+ *   extract the ScalaUDF from original project list as Alias in 
UnsafeProjection and
+ *   AttributeReference in ColumnarPartialProjectExec output
+ * @param child
+ *   child plan
+ */
+case class ColumnarPartialProjectExec(original: ProjectExec, child: SparkPlan)(
+    replacedAliasUdf: ListBuffer[Alias])
+  extends UnaryExecNode
+  with GlutenPlan {
+
+  private val debug = GlutenConfig.getConf.debug
+
+  private val projectAttributes: ListBuffer[Attribute] = ListBuffer()
+  private val projectIndexInChild: ListBuffer[Int] = ListBuffer()
+  private var UDFAttrNotExists = false
+  private var hasUnsupportedDataType = replacedAliasUdf.exists(a => 
!validateDataType(a.dataType))
+  if (!hasUnsupportedDataType) {
+    getProjectIndexInChildOutput(replacedAliasUdf)
+  }
+
+  @transient override lazy val metrics = Map(
+    "time" -> SQLMetrics.createTimingMetric(sparkContext, "total time of 
partial project"),
+    "column_to_row_time" -> SQLMetrics.createTimingMetric(
+      sparkContext,
+      "time of velox to Arrow ColumnarBatch or UnsafeRow"),
+    "row_to_column_time" -> SQLMetrics.createTimingMetric(
+      sparkContext,
+      "time of Arrow ColumnarBatch or UnsafeRow to velox")
+  )
+
+  override def output: Seq[Attribute] = child.output ++ 
replacedAliasUdf.map(_.toAttribute)

Review Comment:
   No need to include all child's output if only used by `replacedAliasUdf`



##########
backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialProjectExec.scala:
##########
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.GlutenConfig
+import org.apache.gluten.columnarbatch.ColumnarBatches
+import org.apache.gluten.expression.ExpressionUtils
+import org.apache.gluten.extension.{GlutenPlan, ValidationResult}
+import org.apache.gluten.extension.columnar.validator.Validator.Passed
+import org.apache.gluten.iterator.Iterators
+import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
+import org.apache.gluten.sql.shims.SparkShimLoader
+import org.apache.gluten.vectorized.ArrowWritableColumnVector
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
AttributeReference, CaseWhen, Coalesce, Expression, If, LambdaFunction, 
MutableProjection, NamedExpression, NaNvl, ScalaUDF, UnsafeProjection}
+import org.apache.spark.sql.execution.{ExplainUtils, ProjectExec, SparkPlan, 
UnaryExecNode}
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+import org.apache.spark.sql.execution.vectorized.{MutableColumnarRow, 
WritableColumnVector}
+import org.apache.spark.sql.hive.HiveUdfUtil
+import org.apache.spark.sql.types.{BinaryType, BooleanType, ByteType, 
DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, 
NullType, ShortType, StringType, TimestampType, YearMonthIntervalType}
+import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
+
+import scala.collection.mutable.ListBuffer
+
+/**
+ * By rule <PartialProhectRule>, the project not offoload-able that is changed 
to
+ * ProjectExecTransformer + ColumnarPartialProjectExec e.g. sum(myudf(a) + b + 
hash(c)), child is
+ * (a, b, c) ColumnarPartialProjectExec (a, b, c, myudf(a)), 
ProjectExecTransformer(myudf(a) + b +
+ * hash(c))
+ *
+ * @param original
+ *   extract the ScalaUDF from original project list as Alias in 
UnsafeProjection and
+ *   AttributeReference in ColumnarPartialProjectExec output
+ * @param child
+ *   child plan
+ */
+case class ColumnarPartialProjectExec(original: ProjectExec, child: SparkPlan)(
+    replacedAliasUdf: ListBuffer[Alias])
+  extends UnaryExecNode
+  with GlutenPlan {
+
+  private val debug = GlutenConfig.getConf.debug
+
+  private val projectAttributes: ListBuffer[Attribute] = ListBuffer()
+  private val projectIndexInChild: ListBuffer[Int] = ListBuffer()
+  private var UDFAttrNotExists = false
+  private var hasUnsupportedDataType = replacedAliasUdf.exists(a => 
!validateDataType(a.dataType))
+  if (!hasUnsupportedDataType) {
+    getProjectIndexInChildOutput(replacedAliasUdf)
+  }
+
+  @transient override lazy val metrics = Map(
+    "time" -> SQLMetrics.createTimingMetric(sparkContext, "total time of 
partial project"),
+    "column_to_row_time" -> SQLMetrics.createTimingMetric(
+      sparkContext,
+      "time of velox to Arrow ColumnarBatch or UnsafeRow"),
+    "row_to_column_time" -> SQLMetrics.createTimingMetric(
+      sparkContext,
+      "time of Arrow ColumnarBatch or UnsafeRow to velox")
+  )
+
+  override def output: Seq[Attribute] = child.output ++ 
replacedAliasUdf.map(_.toAttribute)
+
+  final override def doExecute(): RDD[InternalRow] = {
+    throw new UnsupportedOperationException(
+      s"${this.getClass.getSimpleName} doesn't support doExecute")
+  }
+
+  final override protected def otherCopyArgs: Seq[AnyRef] = {
+    replacedAliasUdf :: Nil
+  }
+
+  final override val supportsColumnar: Boolean = true
+
+  private def validateExpression(expr: Expression): Boolean = {
+    expr.deterministic && !expr.isInstanceOf[LambdaFunction] && expr.children
+      .forall(validateExpression)
+  }
+
+  private def validateDataType(dataType: DataType): Boolean = {
+    dataType match {
+      case _: BooleanType => true
+      case _: ByteType => true
+      case _: ShortType => true
+      case _: IntegerType => true
+      case _: LongType => true
+      case _: FloatType => true
+      case _: DoubleType => true
+      case _: StringType => true
+      case _: TimestampType => true
+      case _: DateType => true
+      case _: BinaryType => true
+      case _: DecimalType => true
+      case YearMonthIntervalType.DEFAULT => true
+      case _: NullType => true
+      case _ => false

Review Comment:
   Is there particular reason not to support complex types?



##########
backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialProjectExec.scala:
##########
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.GlutenConfig
+import org.apache.gluten.columnarbatch.{ColumnarBatches, VeloxColumnarBatches}
+import org.apache.gluten.expression.ExpressionUtils
+import org.apache.gluten.extension.{GlutenPlan, ValidationResult}
+import org.apache.gluten.extension.columnar.validator.Validator.Passed
+import org.apache.gluten.iterator.Iterators
+import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
+import org.apache.gluten.sql.shims.SparkShimLoader
+import org.apache.gluten.vectorized.ArrowWritableColumnVector
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
AttributeReference, CaseWhen, Coalesce, Expression, If, LambdaFunction, 
MutableProjection, NamedExpression, NaNvl, ScalaUDF, UnsafeProjection}
+import org.apache.spark.sql.execution.{ExplainUtils, ProjectExec, SparkPlan, 
UnaryExecNode}
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+import org.apache.spark.sql.execution.vectorized.{MutableColumnarRow, 
WritableColumnVector}
+import org.apache.spark.sql.hive.HiveUdfUtil
+import org.apache.spark.sql.types.{BinaryType, BooleanType, ByteType, 
DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, 
NullType, ShortType, StringType, TimestampType, YearMonthIntervalType}
+import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
+
+import scala.collection.mutable.ListBuffer
+
+/**
+ * By rule <PartialProhectRule>, the project not offoload-able that is changed 
to
+ * ProjectExecTransformer + ColumnarPartialProjectExec e.g. sum(myudf(a) + b + 
hash(c)), child is
+ * (a, b, c) ColumnarPartialProjectExec (a, b, c, myudf(a)), 
ProjectExecTransformer(myudf(a) + b +
+ * hash(c))
+ *
+ * @param original
+ *   extract the ScalaUDF from original project list as Alias in 
UnsafeProjection and
+ *   AttributeReference in ColumnarPartialProjectExec output
+ * @param child
+ *   child plan
+ */
+case class ColumnarPartialProjectExec(original: ProjectExec, child: SparkPlan)(
+    replacedAliasUdf: ListBuffer[Alias])
+  extends UnaryExecNode
+  with GlutenPlan {
+
+  private val debug = GlutenConfig.getConf.debug
+
+  private val projectAttributes: ListBuffer[Attribute] = ListBuffer()
+  private val projectIndexInChild: ListBuffer[Int] = ListBuffer()
+  private var UDFAttrNotExists = false
+  private var hasUnsupportedDataType = replacedAliasUdf.exists(a => 
!validateDataType(a.dataType))
+  if (!hasUnsupportedDataType) {
+    getProjectIndexInChildOutput(replacedAliasUdf)
+  }
+
+  @transient override lazy val metrics = Map(
+    "time" -> SQLMetrics.createTimingMetric(sparkContext, "total time of 
partial project"),
+    "column_to_row_time" -> SQLMetrics.createTimingMetric(
+      sparkContext,
+      "time of velox to Arrow ColumnarBatch or UnsafeRow"),
+    "row_to_column_time" -> SQLMetrics.createTimingMetric(
+      sparkContext,
+      "time of Arrow ColumnarBatch or UnsafeRow to velox")
+  )
+
+  override def output: Seq[Attribute] = child.output ++ 
replacedAliasUdf.map(_.toAttribute)
+
+  final override def doExecute(): RDD[InternalRow] = {
+    throw new UnsupportedOperationException(
+      s"${this.getClass.getSimpleName} doesn't support doExecute")
+  }
+
+  final override protected def otherCopyArgs: Seq[AnyRef] = {
+    replacedAliasUdf :: Nil
+  }
+
+  final override val supportsColumnar: Boolean = true
+
+  private def validateExpression(expr: Expression): Boolean = {
+    expr.deterministic && !expr.isInstanceOf[LambdaFunction] && expr.children
+      .forall(validateExpression)
+  }
+
+  private def validateDataType(dataType: DataType): Boolean = {
+    dataType match {
+      case _: BooleanType => true
+      case _: ByteType => true
+      case _: ShortType => true
+      case _: IntegerType => true
+      case _: LongType => true
+      case _: FloatType => true
+      case _: DoubleType => true
+      case _: StringType => true
+      case _: TimestampType => true
+      case _: DateType => true
+      case _: BinaryType => true
+      case _: DecimalType => true
+      case YearMonthIntervalType.DEFAULT => true
+      case _: NullType => true
+      case _ => false
+    }
+  }
+
+  private def getProjectIndexInChildOutput(exprs: Seq[Expression]): Unit = {
+    exprs.foreach {
+      case a: AttributeReference =>
+        val index = child.output.indexWhere(s => s.exprId.equals(a.exprId))
+        // Some child operator as HashAggregateTransformer will not have udf 
child column
+        if (index < 0) {
+          UDFAttrNotExists = true
+          log.debug(s"Expression $a should exist in child output 
${child.output}")
+          return
+        } else if (!validateDataType(a.dataType)) {
+          hasUnsupportedDataType = true
+          log.debug(s"Expression $a contains unsupported data type 
${a.dataType}")
+        } else if (!projectIndexInChild.contains(index)) {
+          projectAttributes.append(a.toAttribute)
+          projectIndexInChild.append(index)
+        }
+      case p => getProjectIndexInChildOutput(p.children)
+    }
+  }
+
+  override protected def doValidateInternal(): ValidationResult = {
+    if (!GlutenConfig.getConf.enableColumnarPartialProject) {
+      return ValidationResult.failed("Config disable this feature")
+    }
+    if (UDFAttrNotExists) {
+      return ValidationResult.failed("Attribute in the UDF does not exists in 
its child")
+    }
+    if (hasUnsupportedDataType) {
+      return ValidationResult.failed("Attribute in the UDF contains 
unsupported type")
+    }
+    if (projectAttributes.size == child.output.size) {
+      return ValidationResult.failed("UDF need all the columns in child 
output")
+    }
+    if (original.output.isEmpty) {
+      return ValidationResult.failed("Project fallback because output is 
empty")
+    }
+    if (replacedAliasUdf.isEmpty) {
+      return ValidationResult.failed("No UDF")
+    }
+    if (replacedAliasUdf.size > original.output.size) {
+      // e.g. udf1(col) + udf2(col), it will introduce 2 cols for r2c
+      return ValidationResult.failed("Number of RowToColumn columns is more 
than ProjectExec")
+    }
+    if (!original.projectList.forall(validateExpression(_))) {
+      return ValidationResult.failed("Contains expression not supported")
+    }
+    if (isComplexExpression()) {
+      return ValidationResult.failed("Fallback by complex expression")
+    }
+    ValidationResult.succeeded
+  }
+
+  private def isComplexExpression(): Boolean = {
+    new ExpressionUtils.FallbackComplexExpressions(
+      GlutenConfig.getConf.fallbackExpressionsThreshold)
+      .validate(original) match {
+      case Passed => false
+      case _ => true
+    }
+  }
+
+  override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
+    val totalTime = longMetric("time")
+    val c2r = longMetric("column_to_row_time")
+    val r2c = longMetric("row_to_column_time")
+    val isMutable = canUseMutableProjection()
+    child.executeColumnar().mapPartitions {
+      batches =>
+        val res: Iterator[Iterator[ColumnarBatch]] = new 
Iterator[Iterator[ColumnarBatch]] {
+          override def hasNext: Boolean = batches.hasNext
+
+          override def next(): Iterator[ColumnarBatch] = {
+            val batch = batches.next()
+            if (batch.numRows == 0) {
+              Iterator.empty
+            } else {
+              val start = System.currentTimeMillis()
+              val childData = ColumnarBatches.select(batch, 
projectIndexInChild.toArray)
+              val projectedBatch = if (isMutable) {
+                getProjectedBatchArrow(childData, c2r, r2c)
+              } else getProjectedBatch(childData, c2r, r2c)
+              val batchIterator = projectedBatch.map {
+                b =>
+                  if (b.numCols() != 0) {
+                    val compositeBatch = VeloxColumnarBatches.compose(batch, b)
+                    b.close()
+                    compositeBatch
+                  } else {
+                    b.close()
+                    ColumnarBatches.retain(batch)
+                    batch
+                  }
+              }
+              childData.close()
+              totalTime += System.currentTimeMillis() - start
+              batchIterator
+            }
+          }
+        }
+        Iterators
+          .wrap(res.flatten)
+          .protectInvocationFlow() // Spark may call `hasNext()` again after a 
false output which
+          // is not allowed by Gluten iterators. E.g. 
GroupedIterator#fetchNextGroupIterator
+          .recyclePayload(_.close())
+          .create()
+
+    }
+  }
+
+  // scalastyle:off line.size.limit
+  // String type cannot use MutableProjection
+  // Otherwise will throw java.lang.UnsupportedOperationException: Datatype 
not supported StringType
+  // at 
org.apache.spark.sql.execution.vectorized.MutableColumnarRow.update(MutableColumnarRow.java:224)
+  // at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply(Unknown
 Source)
+  // scalastyle:on line.size.limit
+  private def canUseMutableProjection(): Boolean = {
+    replacedAliasUdf.forall(
+      r =>
+        r.dataType match {
+          case StringType | BinaryType => false
+          case _ => true
+        })
+  }
+
+  /**
+   * add c2r and r2c for unsupported expression child data c2r get 
Iterator[InternalRow], then call
+   * Spark project, then r2c
+   */
+  private def getProjectedBatch(
+      childData: ColumnarBatch,
+      c2r: SQLMetric,
+      r2c: SQLMetric): Iterator[ColumnarBatch] = {
+    // select part of child output and child data
+    val proj = UnsafeProjection.create(replacedAliasUdf, projectAttributes)
+    val numOutputRows = new SQLMetric("numOutputRows")
+    val numInputBatches = new SQLMetric("numInputBatches")
+    val rows = VeloxColumnarToRowExec
+      .toRowIterator(
+        Iterator.single[ColumnarBatch](childData),
+        projectAttributes,
+        numOutputRows,
+        numInputBatches,
+        c2r)
+      .map(proj)
+
+    val schema =
+      
SparkShimLoader.getSparkShims.structFromAttributes(replacedAliasUdf.map(_.toAttribute))
+    RowToVeloxColumnarExec.toColumnarBatchIterator(
+      rows,
+      schema,
+      numOutputRows,
+      numInputBatches,
+      r2c,
+      childData.numRows())
+    // TODO: should check the size <= 1, but now it has bug, will change 
iterator to empty
+  }
+
+  private def getProjectedBatchArrow(
+      childData: ColumnarBatch,
+      c2a: SQLMetric,
+      a2c: SQLMetric): Iterator[ColumnarBatch] = {
+    // select part of child output and child data
+    val proj = MutableProjection.create(replacedAliasUdf, projectAttributes)
+    val numRows = childData.numRows()
+    val start = System.currentTimeMillis()
+    val arrowBatch = if (childData.numCols() == 0) { childData }
+    else ColumnarBatches.load(ArrowBufferAllocators.contextInstance(), 
childData)
+    c2a += System.currentTimeMillis() - start
+
+    val schema =
+      
SparkShimLoader.getSparkShims.structFromAttributes(replacedAliasUdf.map(_.toAttribute))
+    val vectors: Array[WritableColumnVector] = ArrowWritableColumnVector
+      .allocateColumns(numRows, schema)
+      .map {
+        vector =>
+          vector.setValueCount(numRows)
+          vector
+      }
+    val targetRow = new MutableColumnarRow(vectors)
+    for (i <- 0 until numRows) {
+      targetRow.rowId = i
+      proj.target(targetRow).apply(arrowBatch.getRow(i))
+    }
+    val targetBatch = new 
ColumnarBatch(vectors.map(_.asInstanceOf[ColumnVector]), numRows)
+    val start2 = System.currentTimeMillis()
+    val veloxBatch =
+      ColumnarBatches.offload(ArrowBufferAllocators.contextInstance(), 
targetBatch)
+    a2c += System.currentTimeMillis() - start2
+    Iterators
+      .wrap(Iterator.single(veloxBatch))
+      .recycleIterator({
+        arrowBatch.close()
+        targetBatch.close()
+      })
+      .create()
+    // TODO: should check the size <= 1, but now it has bug, will change 
iterator to empty
+  }
+
+  override def verboseStringWithOperatorId(): String = {
+    s"""
+       |$formattedNodeName
+       |${ExplainUtils.generateFieldString("Output", output)}
+       |${ExplainUtils.generateFieldString("Input", child.output)}
+       |${ExplainUtils.generateFieldString("ScalaUDF", replacedAliasUdf)}

Review Comment:
   nit: ScalaUDF -> UDF



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to