This is an automated email from the ASF dual-hosted git repository.

jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d88685  [CARBONDATA-3663] Support loading stage files in batches
3d88685 is described below

commit 3d88685757b4f5eef330bc543e0d8d20f70bb8df
Author: liuzhi <371684...@qq.com>
AuthorDate: Tue Jan 14 14:46:28 2020 +0800

    [CARBONDATA-3663] Support loading stage files in batches
    
    Why is this PR needed?
    When there are a lots of stage files in the stage directory, if load all of 
them in once time, the loading time will can not be control.
    There need a way for users to specify the number of stage files per 
processing, to control the execution time of commands.
    
    What changes were proposed in this PR?
    Add a load option batch_file_count for users to specify the number of stage 
files per processing.
    
    Does this PR introduce any user interface change?
    Yes
    
    Is any new testcase added?
    Yes
    
    This closes #3578
---
 docs/dml-of-carbondata.md                          | 19 ++++-
 .../carbon/flink/TestCarbonPartitionWriter.scala   | 22 +++---
 .../org/apache/carbon/flink/TestCarbonWriter.scala | 87 ++++++++++++++++++++--
 .../spark/load/DataLoadProcessBuilderOnSpark.scala | 44 ++++++++++-
 .../management/CarbonInsertFromStageCommand.scala  | 78 +++++++++----------
 .../spark/sql/parser/CarbonSpark2SqlParser.scala   | 10 ++-
 6 files changed, 190 insertions(+), 70 deletions(-)

diff --git a/docs/dml-of-carbondata.md b/docs/dml-of-carbondata.md
index e148bd0..1cda75a 100644
--- a/docs/dml-of-carbondata.md
+++ b/docs/dml-of-carbondata.md
@@ -316,12 +316,29 @@ CarbonData DML statements are documented here,which 
includes:
   You can use this command to insert them into the table, so that making them 
visible for query.
   
   ```
-  INSERT INTO <CARBONDATA TABLE> STAGE
+  INSERT INTO <CARBONDATA TABLE> STAGE OPTIONS(property_name=property_value, 
...)
   ```
+  **Supported Properties:**
+
+| Property                                                | Description        
                                          |
+| ------------------------------------------------------- | 
------------------------------------------------------------ |
+| [BATCH_FILE_COUNT](#batch_file_count)                   | The number of 
stage files per processing                     |
+
+-
+  You can use the following options to load data:
+
+  - ##### BATCH_FILE_COUNT: 
+    The number of stage files per processing.
+
+    ``` 
+    OPTIONS('batch_file_count'=',')
+    ```
 
   Examples:
   ```
   INSERT INTO table1 STAGE
+
+  INSERT INTO table1 STAGE OPTIONS('batch_file_count' = '5')
   ```
 
 ### Load Data Using Static Partition 
diff --git 
a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala
 
b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala
index fe2fa38..c92d6fc 100644
--- 
a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala
+++ 
b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala
@@ -34,7 +34,6 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.test.util.QueryTest
-import org.junit.Test
 import scala.collection.JavaConverters._
 
 import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
@@ -43,9 +42,8 @@ class TestCarbonPartitionWriter extends QueryTest {
 
   val tableName = "test_flink_partition"
 
-  @Test
-  def testLocal(): Unit = {
-    sql(s"drop table if exists $tableName").collect()
+  test("Writing flink data to local partition carbon table") {
+    sql(s"DROP TABLE IF EXISTS $tableName").collect()
     sql(
       s"""
          | CREATE TABLE $tableName (stringField string, intField int, 
shortField short)
@@ -122,17 +120,16 @@ class TestCarbonPartitionWriter extends QueryTest {
 
       sql(s"INSERT INTO $tableName STAGE")
 
-      checkAnswer(sql(s"select count(1) from $tableName"), Seq(Row(1000)))
+      checkAnswer(sql(s"SELECT count(1) FROM $tableName"), Seq(Row(1000)))
 
     } finally {
-      sql(s"drop table if exists $tableName").collect()
+      sql(s"DROP TABLE IF EXISTS $tableName").collect()
       delDir(new File(dataPath))
     }
   }
 
-  @Test
-  def testComplexType(): Unit = {
-    sql(s"drop table if exists $tableName").collect()
+  test("Test complex type") {
+    sql(s"DROP TABLE IF EXISTS $tableName").collect()
     sql(
       s"""
          | CREATE TABLE $tableName (stringField string, intField int, 
shortField short,
@@ -212,14 +209,14 @@ class TestCarbonPartitionWriter extends QueryTest {
 
       sql(s"INSERT INTO $tableName STAGE")
 
-      checkAnswer(sql(s"select count(1) from $tableName"), Seq(Row(1000)))
+      checkAnswer(sql(s"SELECT count(1) FROM $tableName"), Seq(Row(1000)))
 
-      val rows = sql(s"select * from $tableName limit 1").collect()
+      val rows = sql(s"SELECT * FROM $tableName limit 1").collect()
       assertResult(1)(rows.length)
       assertResult(Array[Byte](2, 3, 
4))(rows(0).get(rows(0).fieldIndex("binaryfield")).asInstanceOf[GenericRowWithSchema](0))
 
     } finally {
-      sql(s"drop table if exists $tableName").collect()
+      sql(s"DROP TABLE IF EXISTS $tableName").collect()
       delDir(new File(dataPath))
     }
   }
@@ -231,7 +228,6 @@ class TestCarbonPartitionWriter extends QueryTest {
     val properties = new Properties
     properties.setProperty(CarbonLocalProperty.DATA_TEMP_PATH, dataTempPath)
     properties.setProperty(CarbonLocalProperty.DATA_PATH, dataPath)
-    properties.setProperty(CarbonLocalProperty.COMMIT_THRESHOLD, "100")
     properties
   }
 
diff --git 
a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
 
b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
index 9195863..a297dcf 100644
--- 
a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
+++ 
b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
@@ -27,7 +27,6 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.test.util.QueryTest
-import org.junit.Test
 
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.path.CarbonTablePath
@@ -36,9 +35,8 @@ class TestCarbonWriter extends QueryTest {
 
   val tableName = "test_flink"
 
-  @Test
-  def testLocal(): Unit = {
-    sql(s"drop table if exists $tableName").collect()
+  test("Writing flink data to local carbon table") {
+    sql(s"DROP TABLE IF EXISTS $tableName").collect()
     sql(
       s"""
          | CREATE TABLE $tableName (stringField string, intField int, 
shortField short)
@@ -103,14 +101,91 @@ class TestCarbonWriter extends QueryTest {
 
       sql(s"INSERT INTO $tableName STAGE")
 
-      checkAnswer(sql(s"select count(1) from $tableName"), Seq(Row(1000)))
+      checkAnswer(sql(s"SELECT count(1) FROM $tableName"), Seq(Row(1000)))
 
       // ensure the stage snapshot file and all stage files are deleted
       
assertResult(false)(FileFactory.isFileExist(CarbonTablePath.getStageSnapshotFile(tablePath)))
       
assertResult(true)(FileFactory.getCarbonFile(CarbonTablePath.getStageDir(tablePath)).listFiles().isEmpty)
 
     } finally {
-      sql(s"drop table if exists $tableName").collect()
+      sql(s"DROP TABLE IF EXISTS $tableName").collect()
+      new File(dataPath).delete()
+    }
+  }
+
+  test("test batch_file_count option") {
+    sql(s"DROP TABLE IF EXISTS $tableName").collect()
+    sql(
+      s"""
+         | CREATE TABLE $tableName (stringField string, intField int, 
shortField short)
+         | STORED AS carbondata
+      """.stripMargin
+    ).collect()
+
+    val rootPath = System.getProperty("user.dir") + "/target/test-classes"
+
+    val dataTempPath = rootPath + "/data/temp/"
+    val dataPath = rootPath + "/data/"
+    new File(dataPath).delete()
+    new File(dataPath).mkdir()
+
+    try {
+      val tablePath = storeLocation + "/" + tableName + "/"
+
+      val writerProperties = newWriterProperties(dataTempPath, dataPath, 
storeLocation)
+      val carbonProperties = newCarbonProperties(storeLocation)
+
+      writerProperties.put(CarbonLocalProperty.COMMIT_THRESHOLD, "100")
+
+      val environment = StreamExecutionEnvironment.getExecutionEnvironment
+      environment.setParallelism(1)
+      environment.setRestartStrategy(RestartStrategies.noRestart)
+
+      val dataCount = 1000
+      val source = new TestSource(dataCount) {
+        @throws[InterruptedException]
+        override def get(index: Int): Array[AnyRef] = {
+          val data = new Array[AnyRef](3)
+          data(0) = "test" + index
+          data(1) = index.asInstanceOf[AnyRef]
+          data(2) = 12345.asInstanceOf[AnyRef]
+          data
+        }
+
+        @throws[InterruptedException]
+        override def onFinish(): Unit = {
+          Thread.sleep(5000L)
+        }
+      }
+      val stream = environment.addSource(source)
+      val factory = CarbonWriterFactory.builder("Local").build(
+        "default",
+        tableName,
+        tablePath,
+        new Properties,
+        writerProperties,
+        carbonProperties
+      )
+      val streamSink = StreamingFileSink.forBulkFormat(new 
Path(ProxyFileSystem.DEFAULT_URI), factory).build
+
+      stream.addSink(streamSink)
+
+      try environment.execute
+      catch {
+        case exception: Exception =>
+          // TODO
+          throw new UnsupportedOperationException(exception)
+      }
+
+      sql(s"INSERT INTO $tableName STAGE OPTIONS ('batch_file_count' = '5')")
+
+      checkAnswer(sql(s"SELECT count(1) FROM $tableName"), Seq(Row(500)))
+
+      sql(s"INSERT INTO $tableName STAGE OPTIONS ('batch_file_count' = '5')")
+
+      checkAnswer(sql(s"SELECT count(1) FROM $tableName"), Seq(Row(1000)))
+    } finally {
+      sql(s"DROP TABLE IF EXISTS $tableName").collect()
       new File(dataPath).delete()
     }
   }
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index dc97cd9..ae859c0 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -22,16 +22,20 @@ import java.util.Comparator
 import scala.collection.JavaConverters._
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.spark.{Accumulator, DataSkewRangePartitioner, TaskContext}
+import org.apache.hadoop.mapreduce.InputSplit
+import org.apache.spark.{Accumulator, CarbonInputMetrics, 
DataSkewRangePartitioner, TaskContext}
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
 import org.apache.spark.sql.execution.command.ExecutionErrors
-import org.apache.spark.sql.util.SparkSQLUtil
+import org.apache.spark.sql.util.{SparkSQLUtil, SparkTypeConverter}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.unsafe.types.UTF8String
 
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.converter.SparkDataTypeConverterImpl
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.row.CarbonRow
 import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes, 
StructField, StructType}
@@ -40,6 +44,7 @@ import 
org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, Ca
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatus}
 import org.apache.carbondata.core.util._
 import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer
+import org.apache.carbondata.hadoop.CarbonProjection
 import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat
 import org.apache.carbondata.processing.loading.{CarbonDataLoadConfiguration, 
DataField, DataLoadProcessBuilder, FailureCauses}
 import 
org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
@@ -47,8 +52,9 @@ import 
org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.sort.sortdata.{NewRowComparator, 
NewRowComparatorForNormalDims, SortParameters}
 import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, 
TableOptionConstant}
-import org.apache.carbondata.spark.rdd.StringArrayRow
+import org.apache.carbondata.spark.rdd.{CarbonScanRDD, StringArrayRow}
 import org.apache.carbondata.spark.util.CommonUtil
+import org.apache.carbondata.store.CarbonRowReadSupport
 
 /**
  * Use sortBy operator in spark to load the data
@@ -423,6 +429,38 @@ object DataLoadProcessBuilderOnSpark {
     }
     loadModel
   }
+
+  /**
+   * create DataFrame basing on specified splits
+   */
+  def createInputDataFrame(
+      sparkSession: SparkSession,
+      carbonTable: CarbonTable,
+      splits: Seq[InputSplit]
+  ): DataFrame = {
+    val columns = carbonTable
+      .getCreateOrderColumn
+      .asScala
+      .map(_.getColName)
+      .toArray
+    val schema = SparkTypeConverter.createSparkSchema(carbonTable, columns)
+    val rdd: RDD[InternalRow] = new CarbonScanRDD[CarbonRow](
+      sparkSession,
+      columnProjection = new CarbonProjection(columns),
+      null,
+      carbonTable.getAbsoluteTableIdentifier,
+      carbonTable.getTableInfo.serialize,
+      carbonTable.getTableInfo,
+      new CarbonInputMetrics,
+      null,
+      classOf[SparkDataTypeConverterImpl],
+      classOf[CarbonRowReadSupport],
+      splits.asJava)
+      .map { row =>
+        new GenericInternalRow(row.getData.asInstanceOf[Array[Any]])
+      }
+    SparkSQLUtil.execute(rdd, schema, sparkSession)
+  }
 }
 
 class PrimtiveOrdering(dataType: DataType) extends Ordering[Object] {
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
index a4dd45b..0d1121d 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertFromStageCommand.scala
@@ -28,7 +28,7 @@ import com.google.gson.Gson
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapreduce.InputSplit
 import org.apache.log4j.Logger
-import org.apache.spark.sql.{CarbonEnv, CarbonUtils, Row, SparkSession}
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
 import org.apache.spark.sql.execution.command.{Checker, DataCommand}
 import org.apache.spark.sql.util.SparkSQLUtil
 
@@ -56,7 +56,8 @@ import 
org.apache.carbondata.spark.load.DataLoadProcessBuilderOnSpark
  */
 case class CarbonInsertFromStageCommand(
     databaseNameOp: Option[String],
-    tableName: String
+    tableName: String,
+    options: Map[String, String]
 ) extends DataCommand {
 
   @transient var LOGGER: Logger = _
@@ -113,7 +114,16 @@ case class CarbonInsertFromStageCommand(
       //   8) delete the snapshot file
 
       // 1) read all existing stage files
-      val stageFiles = listStageFiles(stagePath, hadoopConf)
+      val batchSize = try {
+        Integer.valueOf(options.getOrElse("batch_file_count", 
Integer.MAX_VALUE.toString))
+      } catch {
+        case _: NumberFormatException =>
+          throw new MalformedCarbonCommandException("Option [batch_file_count] 
is not a number.")
+      }
+      if (batchSize < 1) {
+        throw new MalformedCarbonCommandException("Option [batch_file_count] 
is less than 1.")
+      }
+      val stageFiles = listStageFiles(stagePath, hadoopConf, batchSize)
       if (stageFiles.isEmpty) {
         // no stage files, so do nothing
         LOGGER.warn("files not found under stage metadata folder")
@@ -258,25 +268,14 @@ case class CarbonInsertFromStageCommand(
       LOGGER.info(s"start to load ${splits.size} files into " +
                   s"${table.getDatabaseName}.${table.getTableName}")
       val start = System.currentTimeMillis()
-      try {
-        CarbonUtils
-          .threadSet(CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
-            table.getDatabaseName + CarbonCommonConstants.POINT + 
table.getTableName,
-            splits.map(s => 
s.asInstanceOf[CarbonInputSplit].getSegmentId).mkString(","))
-        val dataFrame = SparkSQLUtil.createInputDataFrame(spark, table)
-        DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(
-          spark,
-          Option(dataFrame),
-          loadModel,
-          SparkSQLUtil.sessionState(spark).newHadoopConf()
-        ).map { row =>
+      val dataFrame = 
DataLoadProcessBuilderOnSpark.createInputDataFrame(spark, table, splits)
+      DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(
+        spark,
+        Option(dataFrame),
+        loadModel,
+        SparkSQLUtil.sessionState(spark).newHadoopConf()
+      ).map { row =>
           (row._1, FailureCauses.NONE == row._2._2.failureCauses)
-        }
-      } finally {
-        CarbonUtils
-          .threadUnset(CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
-            table.getDatabaseName + "." +
-            table.getTableName)
       }
       LOGGER.info(s"finish data loading, time taken 
${System.currentTimeMillis() - start}ms")
 
@@ -316,26 +315,11 @@ case class CarbonInsertFromStageCommand(
     val start = System.currentTimeMillis()
     partitionDataList.map {
       case (partition, splits) =>
-        LOGGER.info(s"start to load ${ splits.size } files into " +
-          s"${ table.getDatabaseName }.${ table.getTableName }. " +
-          s"Partition information: ${ partition.mkString(",") }")
-        val dataFrame = try {
-          // Segments should be set for query here, because consider a 
scenario where custom
-          // compaction is triggered, so it can happen that all the segments 
might be taken into
-          // consideration instead of custom segments if we do not set, 
leading to duplicate data in
-          // compacted segment. To avoid this, segments to be considered are 
to be set in threadset.
-          CarbonUtils
-            .threadSet(CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
-              table.getDatabaseName + CarbonCommonConstants.POINT +
-              table.getTableName,
-              splits.map(split => 
split.asInstanceOf[CarbonInputSplit].getSegmentId).mkString(","))
-          SparkSQLUtil.createInputDataFrame(spark, table)
-        } finally {
-          CarbonUtils.threadUnset(
-            CarbonCommonConstants.CARBON_INPUT_SEGMENTS + 
table.getDatabaseName +
-              CarbonCommonConstants.POINT +
-              table.getTableName)
-        }
+        LOGGER.info(s"start to load ${splits.size} files into " +
+          s"${table.getDatabaseName}.${table.getTableName}. " +
+          s"Partition information: ${partition.mkString(",")}")
+        val dataFrame =
+          DataLoadProcessBuilderOnSpark.createInputDataFrame(spark, table, 
splits)
         val columns = dataFrame.columns
         val header = columns.mkString(",")
         val selectColumns = columns.filter(!partition.contains(_))
@@ -457,7 +441,8 @@ case class CarbonInsertFromStageCommand(
    */
   private def listStageFiles(
       loadDetailsDir: String,
-      hadoopConf: Configuration
+      hadoopConf: Configuration,
+      batchSize: Int
   ): Array[(CarbonFile, CarbonFile)] = {
     val dir = FileFactory.getCarbonFile(loadDetailsDir, hadoopConf)
     if (dir.exists()) {
@@ -467,13 +452,20 @@ case class CarbonInsertFromStageCommand(
       }.map { file =>
         (file.getName.substring(0, file.getName.indexOf(".")), file)
       }.toMap
-      allFiles.filter { file =>
+      val stageFiles = allFiles.filter { file =>
         !file.getName.endsWith(CarbonTablePath.SUCCESS_FILE_SUBFIX)
       }.filter { file =>
         successFiles.contains(file.getName)
+      }.sortWith {
+        (file1, file2) => file1.getLastModifiedTime < file2.getLastModifiedTime
       }.map { file =>
         (file, successFiles(file.getName))
       }
+      if (stageFiles.length <= batchSize) {
+        stageFiles
+      } else {
+        stageFiles.dropRight(stageFiles.length - batchSize)
+      }
     } else {
       Array.empty
     }
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 10b661a..ee094d7 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -523,12 +523,14 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
     }
 
   /**
-   * INSERT INTO [dbName.]tableName STAGE
+   * INSERT INTO [dbName.]tableName STAGE [OPTIONS (key1=value1, key2=value2, 
...)]
    */
   protected lazy val insertStageData: Parser[LogicalPlan] =
-    INSERT ~ INTO ~> (ident <~ ".").? ~ ident <~ STAGE <~ opt(";") ^^ {
-      case dbName ~ tableName =>
-        CarbonInsertFromStageCommand(dbName, tableName)
+    INSERT ~ INTO ~> (ident <~ ".").? ~ ident ~ STAGE ~
+    (OPTIONS ~> "(" ~> repsep(loadOptions, ",") <~ ")").? <~ opt(";") ^^ {
+      case dbName ~ tableName ~ stage ~ options =>
+        CarbonInsertFromStageCommand(dbName, tableName,
+          options.getOrElse(List[(String, String)]()).toMap[String, String])
     }
 
   protected lazy val cleanFiles: Parser[LogicalPlan] =

Reply via email to