This is an automated email from the ASF dual-hosted git repository. peacewong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/linkis.git
commit 9e9ab021943149a314cb71606e177945375fb49b Author: peacewong <[email protected]> AuthorDate: Tue Oct 10 21:08:29 2023 +0800 code optimize --- .../pipeline/executor/ExcelExecutor.scala | 6 +- .../spark/Interpreter/PythonInterpreter.scala | 75 ---------------------- 2 files changed, 4 insertions(+), 77 deletions(-) diff --git a/linkis-engineconn-plugins/pipeline/src/main/scala/org/apache/linkis/manager/engineplugin/pipeline/executor/ExcelExecutor.scala b/linkis-engineconn-plugins/pipeline/src/main/scala/org/apache/linkis/manager/engineplugin/pipeline/executor/ExcelExecutor.scala index 42fd016e2..42c0e27cd 100644 --- a/linkis-engineconn-plugins/pipeline/src/main/scala/org/apache/linkis/manager/engineplugin/pipeline/executor/ExcelExecutor.scala +++ b/linkis-engineconn-plugins/pipeline/src/main/scala/org/apache/linkis/manager/engineplugin/pipeline/executor/ExcelExecutor.scala @@ -18,6 +18,7 @@ package org.apache.linkis.manager.engineplugin.pipeline.executor import org.apache.linkis.common.io.FsPath +import org.apache.linkis.common.utils.ResultSetUtils import org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext import org.apache.linkis.manager.engineplugin.pipeline.conf.PipelineEngineConfiguration import org.apache.linkis.manager.engineplugin.pipeline.conf.PipelineEngineConfiguration.PIPELINE_OUTPUT_ISOVERWRITE_SWITCH @@ -84,8 +85,9 @@ class ExcelExecutor extends PipeLineExecutor { if (fsPathListWithError == null) { throw new PipeLineErrorException(EMPTY_DIR.getErrorCode, EMPTY_DIR.getErrorDesc) } - fileSource = - FileSource.create(fsPathListWithError.getFsPaths.toArray(Array[FsPath]()), sourceFs) + val fsPathList = fsPathListWithError.getFsPaths + ResultSetUtils.sortByNameNum(fsPathList) + fileSource = FileSource.create(fsPathList.toArray(Array[FsPath]()), sourceFs) } if (!FileSource.isTableResultSet(fileSource)) { throw new PipeLineErrorException( diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/Interpreter/PythonInterpreter.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/Interpreter/PythonInterpreter.scala index dbbac2623..df6fe55f7 100644 --- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/Interpreter/PythonInterpreter.scala +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/Interpreter/PythonInterpreter.scala @@ -76,78 +76,3 @@ object PythonInterpreter { } } - -object SQLSession extends Logging { - - def showDF(sc: SparkContext, jobGroup: String, df: Any, maxResult: Int = Int.MaxValue): String = { - val startTime = System.currentTimeMillis() - - val iterator = Utils.tryThrow(df.asInstanceOf[DataFrame].toLocalIterator)(t => { - sc.clearJobGroup() - t - }) - - var columns: List[Attribute] = null - // get field names - Utils.tryThrow({ - val qe = df.getClass.getMethod("queryExecution").invoke(df) - val a = qe.getClass.getMethod("analyzed").invoke(qe) - val seq = a.getClass.getMethod("output").invoke(a).asInstanceOf[Seq[Any]] - columns = seq.toList.asInstanceOf[List[Attribute]] - })(t => { - sc.clearJobGroup() - t - }) - var schema = new StringBuilder - schema ++= "%TABLE\n" - val nameSet = new mutable.HashSet[String]() - for (col <- columns) { - nameSet.add(col.name) - schema ++= col.name ++ "\t" - } - val trim = if (nameSet.size < columns.length) { - var schemaWithAlis = new StringBuilder - schemaWithAlis ++= "%TABLE\n" - for (col <- columns) { - val colName = col.qualifiedName - schemaWithAlis ++= colName ++ "\t" - } - logger.info("I AM IN LESS") - logger.info(schemaWithAlis.toString.trim) - schemaWithAlis.toString.trim - } else { - logger.info("I AM IN MORE") - logger.info(schema.toString.trim) - schema.toString.trim - } - val msg = FSFactory.getFs("").write(new FsPath(""), true) - msg.write(trim.getBytes("utf-8")) - - var index = 0 - Utils.tryThrow({ - while (index < maxResult && iterator.hasNext) { - msg.write("\n".getBytes("utf-8")) - val row = iterator.next() - columns.indices.foreach { i => - if (row.isNullAt(i)) msg.write("NULL".getBytes("utf-8")) - else msg.write(row.apply(i).asInstanceOf[Object].toString.getBytes("utf-8")) - if (i != columns.size - 1) { - msg.write("\t".getBytes("utf-8")) - } - } - index += 1 - } - })(t => { - sc.clearJobGroup() - t - }) - val colCount = if (columns != null) columns.size else 0 - logger.warn(s"Fetched $colCount col(s) : $index row(s).") - sc.clearJobGroup() - Utils.tryFinally({ - msg.flush() - msg.toString - }) { () => IOUtils.closeQuietly(msg) } - } - -} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
