This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 243098c39af [HUDI-6223] Support Incremental read via Spark SQL table 
valued function (#8729)
243098c39af is described below

commit 243098c39afffd79d881c056282016a2732eaabc
Author: kazdy <[email protected]>
AuthorDate: Tue Jun 20 04:05:51 2023 +0200

    [HUDI-6223] Support Incremental read via Spark SQL table valued function 
(#8729)
    
    * add hudi_table_changes TVF
    * add "earliest" start instant option to hudi_table_changes TVF
    * add HoodieTableChanges to catalyst plan analysis
    * the first param of hudi_table_changes is polymophic, it can be the table 
id or table path
    
    ---------
    
    Co-authored-by: Danny Chan <[email protected]>
---
 .../hudi/functional/TestSparkSqlCoreFlow.scala     |  16 +-
 .../sql/hudi/TestHoodieTableValuedFunction.scala   | 265 +++++++++++++++++++++
 .../catalyst/plans/logcal/HoodieTableChanges.scala |  82 +++++++
 .../hudi/analysis/HoodieSpark32PlusAnalysis.scala  |  28 ++-
 .../sql/hudi/analysis/TableValuedFunctions.scala   |   7 +-
 5 files changed, 379 insertions(+), 19 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala
index cf36f4b258f..fa883cd3eb2 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlCoreFlow.scala
@@ -121,16 +121,12 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase 
{
     compareUpdateDfWithHudiDf(inputDf2, snapshotDf3, snapshotDf3)
     snapshotDf3.unpersist(true)
 
-    // Read Incremental Query, need to use spark-ds because functionality does 
not exist for spark sql
+    // Read Incremental Query, uses hudi_table_changes() table valued function 
for spark sql
     // we have 2 commits, try pulling the first commit (which is not the 
latest)
     //HUDI-5266
     val firstCommit = listCommitsSince(fs, tableBasePath, "000").get(0)
-    val hoodieIncViewDf1 = spark.read.format("org.apache.hudi")
-      .option(DataSourceReadOptions.QUERY_TYPE.key, 
QUERY_TYPE_INCREMENTAL_OPT_VAL)
-      .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, "000")
-      .option(DataSourceReadOptions.END_INSTANTTIME.key, firstCommit)
-      .load(tableBasePath)
-    //val hoodieIncViewDf1 = doIncRead(tableName, isMetadataEnabledOnRead, 
"000", firstCommit)
+    val hoodieIncViewDf1 = spark.sql(s"select * from 
hudi_table_changes('$tableName', 'earliest', '$firstCommit')")
+
     assertEquals(100, hoodieIncViewDf1.count()) // 100 initial inserts must be 
pulled
     var countsPerCommit = 
hoodieIncViewDf1.groupBy("_hoodie_commit_time").count().collect()
     assertEquals(1, countsPerCommit.length)
@@ -141,11 +137,7 @@ class TestSparkSqlCoreFlow extends HoodieSparkSqlTestBase {
 
     //another incremental query with commit2 and commit3
     //HUDI-5266
-    val hoodieIncViewDf2 = spark.read.format("org.apache.hudi")
-      .option(DataSourceReadOptions.QUERY_TYPE.key, 
QUERY_TYPE_INCREMENTAL_OPT_VAL)
-      .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, commitInstantTime2)
-      .option(DataSourceReadOptions.END_INSTANTTIME.key(), commitInstantTime3)
-      .load(tableBasePath)
+    val hoodieIncViewDf2 = spark.sql(s"select * from 
hudi_table_changes('$tableName', '$commitInstantTime2', '$commitInstantTime3')")
 
     assertEquals(uniqueKeyCnt2, hoodieIncViewDf2.count()) // 60 records must 
be pulled
     countsPerCommit = 
hoodieIncViewDf2.groupBy("_hoodie_commit_time").count().collect()
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieTableValuedFunction.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieTableValuedFunction.scala
index 09aa9a50456..1b0aa3ff6e1 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieTableValuedFunction.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestHoodieTableValuedFunction.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.hudi
 
 import org.apache.hudi.HoodieSparkUtils
+import org.apache.spark.sql.functions.{col, from_json}
 
 class TestHoodieTableValuedFunction extends HoodieSparkSqlTestBase {
 
@@ -80,4 +81,268 @@ class TestHoodieTableValuedFunction extends 
HoodieSparkSqlTestBase {
       }
     }
   }
+
+  test(s"Test hudi_table_changes latest_state") {
+    if (HoodieSparkUtils.gteqSpark3_2) {
+      withTempDir { tmp =>
+        Seq(
+          ("cow", true),
+          ("mor", true),
+          ("cow", false),
+          ("mor", false)
+        ).foreach { parameters =>
+          val tableType = parameters._1
+          val isTableId = parameters._2
+          val tableName = generateTableName
+          val tablePath = s"${tmp.getCanonicalPath}/$tableName"
+          val identifier = if (isTableId) tableName else tablePath
+          spark.sql("set hoodie.sql.insert.mode = non-strict")
+          spark.sql(
+            s"""
+               |create table $tableName (
+               |  id int,
+               |  name string,
+               |  price double,
+               |  ts long
+               |) using hudi
+               |tblproperties (
+               |  type = '$tableType',
+               |  primaryKey = 'id',
+               |  preCombineField = 'ts'
+               |)
+               |location '$tablePath'
+               |""".stripMargin
+          )
+
+          spark.sql(
+            s"""
+               | insert into $tableName
+               | values (1, 'a1', 10, 1000), (2, 'a2', 20, 1000), (3, 'a3', 
30, 1000)
+               | """.stripMargin
+          )
+
+          val firstInstant = spark.sql(s"select min(_hoodie_commit_time) as 
commitTime from  $tableName order by commitTime").first().getString(0)
+
+          checkAnswer(
+            s"""select id,
+               |name,
+               |price,
+               |ts
+               |from hudi_table_changes('$identifier', 'latest_state', 
'earliest')
+               |""".stripMargin
+          )(
+            Seq(1, "a1", 10.0, 1000),
+            Seq(2, "a2", 20.0, 1000),
+            Seq(3, "a3", 30.0, 1000)
+          )
+
+          spark.sql(
+            s"""
+               | insert into $tableName
+               | values (1, 'a1_1', 10, 1100), (2, 'a2_2', 20, 1100), (3, 
'a3_3', 30, 1100)
+               | """.stripMargin
+          )
+          val secondInstant = spark.sql(s"select max(_hoodie_commit_time) as 
commitTime from  $tableName order by commitTime").first().getString(0)
+
+          checkAnswer(
+            s"""select id,
+               |name,
+               |price,
+               |ts
+               |from hudi_table_changes(
+               |'$identifier',
+               |'latest_state',
+               |'$firstInstant')
+               |""".stripMargin
+          )(
+            Seq(1, "a1_1", 10.0, 1100),
+            Seq(2, "a2_2", 20.0, 1100),
+            Seq(3, "a3_3", 30.0, 1100)
+          )
+
+          spark.sql(
+            s"""
+               | insert into $tableName
+               | values (1, 'a1_1', 10, 1200), (2, 'a2_2', 20, 1200), (3, 
'a3_3', 30, 1200)
+               | """.stripMargin
+          )
+
+          // should not include the first and latest instant
+          checkAnswer(
+            s"""select id,
+               | name,
+               | price,
+               | ts
+               | from hudi_table_changes(
+               | '$identifier',
+               | 'latest_state',
+               | '$firstInstant',
+               | '$secondInstant')
+               | """.stripMargin
+          )(
+            Seq(1, "a1_1", 10.0, 1100),
+            Seq(2, "a2_2", 20.0, 1100),
+            Seq(3, "a3_3", 30.0, 1100)
+          )
+        }
+      }
+    }
+  }
+
+  test(s"Test hudi_table_changes cdc") {
+    if (HoodieSparkUtils.gteqSpark3_2) {
+      withTempDir { tmp =>
+        Seq(
+          ("cow", true),
+          ("mor", true),
+          ("cow", false),
+          ("mor", false)
+        ).foreach { parameters =>
+          val tableType = parameters._1
+          val isTableId = parameters._2
+          val tableName = generateTableName
+          val tablePath = s"${tmp.getCanonicalPath}/$tableName"
+          val identifier = if (isTableId) tableName else tablePath
+          spark.sql("set hoodie.sql.insert.mode = upsert")
+          spark.sql(
+            s"""
+               |create table $tableName (
+               |  id int,
+               |  name string,
+               |  price double,
+               |  ts long
+               |) using hudi
+               |tblproperties (
+               |  type = '$tableType',
+               |  primaryKey = 'id',
+               |  preCombineField = 'ts',
+               |  'hoodie.table.cdc.enabled' = 'true',
+               |  'hoodie.table.cdc.supplemental.logging.mode' = 
'data_before_after'
+               |)
+               |location '$tablePath'
+               |""".stripMargin
+          )
+
+          spark.sql(
+            s"""
+               | insert into $tableName
+               | values (1, 'a1', 10, 1000), (2, 'a2', 20, 1000), (3, 'a3', 
30, 1000)
+               | """.stripMargin
+          )
+          val originSchema = spark.read.format("hudi").load(tablePath).schema
+          val firstInstant = spark.sql(s"select min(_hoodie_commit_time) as 
commitTime from  $tableName order by commitTime").first().getString(0)
+
+          val cdcDataOnly1 = spark.sql(
+            s"""select
+               | op,
+               | before,
+               | after
+               |from hudi_table_changes('$identifier', 'cdc', 'earliest')
+               |""".stripMargin
+          )
+
+          val change1 = cdcDataOnly1.select(
+            col("op"),
+            col("before"),
+            from_json(col("after"), originSchema).as("after")
+          ).select(
+            col("op"),
+            col("before"),
+            col("after.id"),
+            col("after.name"),
+            col("after.price"),
+            col("after.ts")
+          ).orderBy("after.id").collect()
+          checkAnswer(change1)(
+            Seq("i", null, 1, "a1", 10.0, 1000),
+            Seq("i", null, 2, "a2", 20.0, 1000),
+            Seq("i", null, 3, "a3", 30.0, 1000)
+          )
+
+          spark.sql(
+            s"""
+               | insert into $tableName
+               | values (1, 'a1_1', 10, 1100), (2, 'a2_2', 20, 1100), (3, 
'a3_3', 30, 1100)
+               | """.stripMargin
+          )
+          val secondInstant = spark.sql(s"select max(_hoodie_commit_time) as 
commitTime from  $tableName order by commitTime").first().getString(0)
+
+          val cdcDataOnly2 = spark.sql(
+            s"""select
+               | op,
+               | before,
+               | after
+               |from hudi_table_changes(
+               |'$identifier',
+               |'cdc',
+               |'$firstInstant')
+               |""".stripMargin
+          )
+
+          val change2 = cdcDataOnly2.select(
+            col("op"),
+            from_json(col("before"), originSchema).as("before"),
+            from_json(col("after"), originSchema).as("after")
+          ).select(
+            col("op"),
+            col("before.id"),
+            col("before.name"),
+            col("before.price"),
+            col("before.ts"),
+            col("after.id"),
+            col("after.name"),
+            col("after.price"),
+            col("after.ts")
+          ).orderBy("after.id").collect()
+          checkAnswer(change2)(
+            Seq("u", 1, "a1", 10.0, 1000, 1, "a1_1", 10.0, 1100),
+            Seq("u", 2, "a2", 20.0, 1000, 2, "a2_2", 20.0, 1100),
+            Seq("u", 3, "a3", 30.0, 1000, 3, "a3_3", 30.0, 1100)
+          )
+
+          spark.sql(
+            s"""
+               | insert into $tableName
+               | values (1, 'a1_1', 11, 1200), (2, 'a2_2', 21, 1200), (3, 
'a3_3', 31, 1200)
+               | """.stripMargin
+          )
+
+          // should not include the first and latest instant
+          val cdcDataOnly3 = spark.sql(
+            s"""select
+               | op,
+               | before,
+               | after
+               | from hudi_table_changes(
+               | '$identifier',
+               | 'cdc',
+               | '$firstInstant',
+               | '$secondInstant')
+               | """.stripMargin
+          )
+
+          val change3 = cdcDataOnly3.select(
+            col("op"),
+            from_json(col("before"), originSchema).as("before"),
+            from_json(col("after"), originSchema).as("after")
+          ).select(
+            col("op"),
+            col("before.id"),
+            col("before.name"),
+            col("before.price"),
+            col("before.ts"),
+            col("after.id"),
+            col("after.name"),
+            col("after.price"),
+            col("after.ts")
+          ).orderBy("after.id").collect()
+          checkAnswer(change3)(
+            Seq("u", 1, "a1", 10.0, 1000, 1, "a1_1", 10.0, 1100),
+            Seq("u", 2, "a2", 20.0, 1000, 2, "a2_2", 20.0, 1100),
+            Seq("u", 3, "a3", 30.0, 1000, 3, "a3_3", 30.0, 1100)
+          )
+        }
+      }
+    }
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logcal/HoodieTableChanges.scala
 
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logcal/HoodieTableChanges.scala
new file mode 100644
index 00000000000..8199e18918b
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/catalyst/plans/logcal/HoodieTableChanges.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logcal
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.catalyst.plans.logical.LeafNode
+
+object HoodieTableChangesOptionsParser {
+  def parseOptions(exprs: Seq[Expression], funcName: String): (String, 
Map[String, String]) = {
+    val args = exprs.map(_.eval().toString)
+
+    if (args.size < 3 || args.size > 4) {
+      throw new AnalysisException(s"Expect arguments (table_name or 
table_path, incremental_format, start_instant, [end_instant]) for function 
`$funcName`")
+    }
+
+    val identifier = args.head
+    val incrementalQueryFormat = args(1)
+    val startInstantTime = args(2)
+    val endInstantTime = args.drop(3).headOption
+
+    val incrementalQueryTypeOpt = Map("hoodie.datasource.query.type" -> 
"incremental")
+
+    val incrementalQueryFormatOpt = incrementalQueryFormat match {
+      case "latest_state" | "cdc" => 
Map("hoodie.datasource.query.incremental.format" -> incrementalQueryFormat)
+      case _ => throw new AnalysisException(s"'hudi_table_changes' doesn't 
support `$incrementalQueryFormat`")
+    }
+
+    val startInstantTimeOpt = startInstantTime match {
+      case "earliest" => Map("hoodie.datasource.read.begin.instanttime" -> 
"000")
+      case _ => Map("hoodie.datasource.read.begin.instanttime" -> 
startInstantTime)
+    }
+
+    val endInstantTimeOpt = endInstantTime match {
+      case Some(x) => Map("hoodie.datasource.read.end.instanttime" -> x)
+      case None => Map.empty[String, String]
+    }
+
+    val opts: Map[String, String] = incrementalQueryTypeOpt ++ 
incrementalQueryFormatOpt ++ startInstantTimeOpt ++ endInstantTimeOpt
+
+    (identifier, opts)
+  }
+
+}
+
+
+case class HoodieTableChanges(args: Seq[Expression]) extends LeafNode {
+
+  override def output: Seq[Attribute] = Nil
+
+  override lazy val resolved: Boolean = false
+
+}
+
+object HoodieTableChanges {
+
+  val FUNC_NAME = "hudi_table_changes";
+
+}
+
+case class HoodieTableChangesByPath(args: Seq[Expression]) extends LeafNode {
+
+  override def output: Seq[Attribute] = Nil
+
+  override lazy val resolved: Boolean = false
+
+}
diff --git 
a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark32PlusAnalysis.scala
 
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark32PlusAnalysis.scala
index 58841f19df2..fc4d5d18ff2 100644
--- 
a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark32PlusAnalysis.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSpark32PlusAnalysis.scala
@@ -17,12 +17,12 @@
 
 package org.apache.spark.sql.hudi.analysis
 
+import org.apache.hadoop.fs.Path
 import org.apache.hudi.{DataSourceReadOptions, DefaultSource, 
SparkAdapterSupport}
 import org.apache.spark.sql.HoodieSpark3CatalystPlanUtils.MatchResolvedTable
 import org.apache.spark.sql.catalyst.analysis.UnresolvedPartitionSpec
 import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils}
-import org.apache.spark.sql.catalyst.plans.logcal.HoodieQuery
-import org.apache.spark.sql.catalyst.plans.logcal.HoodieQuery.parseOptions
+import org.apache.spark.sql.catalyst.plans.logcal.{HoodieQuery, 
HoodieTableChanges, HoodieTableChangesByPath, HoodieTableChangesOptionsParser}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
 import 
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper
@@ -54,12 +54,12 @@ case class HoodieDataSourceV2ToV1Fallback(sparkSession: 
SparkSession) extends Ru
 
     // NOTE: Unfortunately, [[InsertIntoStatement]] is implemented in a way 
that doesn't expose
     //       target relation as a child (even though there's no good reason 
for that)
-    case iis @ InsertIntoStatement(rv2 @ DataSourceV2Relation(v2Table: 
HoodieInternalV2Table, _, _, _, _), _, _, _, _, _) =>
+    case iis@InsertIntoStatement(rv2@DataSourceV2Relation(v2Table: 
HoodieInternalV2Table, _, _, _, _), _, _, _, _, _) =>
       iis.copy(table = convertToV1(rv2, v2Table))
 
     case _ =>
       plan.resolveOperatorsDown {
-        case rv2 @ DataSourceV2Relation(v2Table: HoodieInternalV2Table, _, _, 
_, _) => convertToV1(rv2, v2Table)
+        case rv2@DataSourceV2Relation(v2Table: HoodieInternalV2Table, _, _, _, 
_) => convertToV1(rv2, v2Table)
       }
   }
 
@@ -101,8 +101,8 @@ case class HoodieSpark32PlusResolveReferences(spark: 
SparkSession) extends Rule[
 
       LogicalRelation(relation, table)
 
-    case q: HoodieQuery =>
-      val (tableName, opts) = parseOptions(q.args)
+    case HoodieQuery(args) =>
+      val (tableName, opts) = HoodieQuery.parseOptions(args)
 
       val tableId = 
spark.sessionState.sqlParser.parseTableIdentifier(tableName)
       val catalogTable = spark.sessionState.catalog.getTableMetadata(tableId)
@@ -112,6 +112,22 @@ case class HoodieSpark32PlusResolveReferences(spark: 
SparkSession) extends Rule[
         catalogTable.location.toString))
 
       LogicalRelation(relation, catalogTable)
+
+    case HoodieTableChanges(args) =>
+      val (tablePath, opts) = 
HoodieTableChangesOptionsParser.parseOptions(args, HoodieTableChanges.FUNC_NAME)
+      val hoodieDataSource = new DefaultSource
+      if (tablePath.contains(Path.SEPARATOR)) {
+        // the first param is table path
+        val relation = hoodieDataSource.createRelation(spark.sqlContext, opts 
++ Map("path" -> tablePath))
+        LogicalRelation(relation)
+      } else {
+        // the first param is table identifier
+        val tableId = 
spark.sessionState.sqlParser.parseTableIdentifier(tablePath)
+        val catalogTable = spark.sessionState.catalog.getTableMetadata(tableId)
+        val relation = hoodieDataSource.createRelation(spark.sqlContext, opts 
++ Map("path" ->
+          catalogTable.location.toString))
+        LogicalRelation(relation, catalogTable)
+      }
   }
 }
 
diff --git 
a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/TableValuedFunctions.scala
 
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/TableValuedFunctions.scala
index d71ece5dcd0..7a7da9b2752 100644
--- 
a/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/TableValuedFunctions.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.2plus-common/src/main/scala/org/apache/spark/sql/hudi/analysis/TableValuedFunctions.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hudi.analysis
 
 import org.apache.spark.sql.catalyst.FunctionIdentifier
 import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo}
-import org.apache.spark.sql.catalyst.plans.logcal.HoodieQuery
+import org.apache.spark.sql.catalyst.plans.logcal.{HoodieQuery, 
HoodieTableChanges}
 
 object TableValuedFunctions {
 
@@ -28,6 +28,11 @@ object TableValuedFunctions {
       FunctionIdentifier(HoodieQuery.FUNC_NAME),
       new ExpressionInfo(HoodieQuery.getClass.getCanonicalName, 
HoodieQuery.FUNC_NAME),
       (args: Seq[Expression]) => new HoodieQuery(args)
+    ),
+    (
+      FunctionIdentifier(HoodieTableChanges.FUNC_NAME),
+      new ExpressionInfo(HoodieTableChanges.getClass.getCanonicalName, 
HoodieTableChanges.FUNC_NAME),
+      (args: Seq[Expression]) => new HoodieTableChanges(args)
     )
   )
 }

Reply via email to