wangyum commented on pull request #2878:
URL: https://github.com/apache/hive/pull/2878#issuecomment-994669275
This is an example how Spark implement `DownloadDataOperation`:
```scala
private[hive] case class DownloadDataBlock(
path: Option[Path] = None,
offset: Option[Long] = None,
schema: Option[String] = None,
dataSize: Long)
private[hive] class SparkDownloadDataOperation(
val sqlContext: SQLContext,
parentSession: HiveSession,
tableName: String,
query: String,
format: String,
options: JMap[String, String],
runInBackground: Boolean)
extends Operation(
parentSession,
Map.empty[String, String].asJava,
OperationType.UNKNOWN_OPERATION,
runInBackground) with SparkOperation with QueryLogging with Logging {
private var result: DataFrame = _
private lazy val resultSchema: TableSchema = {
if (result == null || result.schema.isEmpty) {
new TableSchema(Arrays.asList(new FieldSchema("Result", "string", "")))
} else {
logInfo(s"Result Schema: ${result.schema}")
SparkExecuteStatementOperation.getTableSchema(result.schema)
}
}
private val pathFilter = new PathFilter {
override def accept(path: Path): Boolean =
!path.getName.equals("_SUCCESS") && !path.getName.endsWith("crc")
}
private val defaultBlockSize = 10 * 1024 * 1024
// Please see CSVOptions for more details.
private val defaultOptions = Map(
"timestampFormat" -> "yyyy-MM-dd HH:mm:ss",
"dateFormat" -> "yyyy-MM-dd",
"delimiter" -> ",",
"escape" -> "\"",
"compression" -> "gzip",
"header" -> "true",
"maxRecordsPerFile" ->"0",
// To avoid Zeta client timeout
"fetchBlockSize" -> defaultBlockSize.toString,
"maxFetchBlockTime" -> "30000",
// To avoid coalesce
"minFileSize" -> (defaultBlockSize - 1 * 1024 * 1024).toString)
private val writeOptions =
defaultOptions ++
Option(options).map(_.asScala).getOrElse(Map.empty[String, String]).toMap
private val numFiles = writeOptions.get("numFiles").map(_.toInt)
private val fetchSize = writeOptions("fetchBlockSize").toLong
private val maxFetchBlockTime = writeOptions("maxFetchBlockTime").toLong
private val minFileSize = writeOptions("minFileSize").toLong
private val downloadQuery = s"Generating download files with arguments " +
s"[${tableName}, ${query}, ${format}, ${writeOptions}]"
private val hadoopConf = sqlContext.sparkContext.hadoopConfiguration
private val scratchDir =
sqlContext.conf.getConf(StaticSQLConf.SPARK_SCRATCH_DIR)
private val pathPrefix = new Path(scratchDir + File.separator +
"DownloadData" + File.separator +
parentSession.getUserName + File.separator +
parentSession.getSessionHandle.getSessionId)
private val fs: FileSystem = pathPrefix.getFileSystem(hadoopConf)
private var iter: JIterator[DownloadDataBlock] = _
private var schemaStr: String = _
private var totalDataSize: Long = 0
override def close(): Unit = {
// CARMEL-4662 Fix Download query state is incorrect.
if (getStatus.getState eq OperationState.FINISHED) {
HiveThriftServer2.eventManager.onStatementFinish(statementId)
}
HiveThriftServer2.eventManager.onQueryExist(
statementId,
QueryLogObjectList(Option(result).map(_.queryExecution)),
QueryLogExtInfo(false, totalDataSize))
logInfo(s"CLOSING $statementId")
cleanup(OperationState.CLOSED)
sqlContext.sparkContext.clearJobGroup()
}
override def runInternal(): Unit = {
setState(OperationState.PENDING)
setHasResultSet(true)
if (!runInBackground) {
execute()
} else {
val sparkServiceUGI = HiveShimsUtils.getUGI()
val backgroundOperation = new Runnable() {
override def run(): Unit = {
val doAsAction = new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = {
try {
execute()
} catch {
case e: HiveSQLException =>
setOperationException(e)
log.error("Error generating download file: ", e)
}
}
}
try {
sparkServiceUGI.doAs(doAsAction)
} catch {
case e: Exception =>
setOperationException(new HiveSQLException(e))
logError("Error generating download file as user : " +
sparkServiceUGI.getShortUserName(), e)
}
}
}
try {
// This submit blocks if no background threads are available to run
this operation
val backgroundHandle =
parentSession.getSessionManager().submitBackgroundOperation(backgroundOperation)
setBackgroundHandle(backgroundHandle)
} catch {
case rejected: RejectedExecutionException =>
setState(OperationState.ERROR)
throw new HiveSQLException("The background threadpool cannot
accept" +
" new task for execution, please retry the operation", rejected)
case NonFatal(e) =>
logError("Error generating download file in background", e)
setState(OperationState.ERROR)
throw new HiveSQLException(e)
}
}
}
private def execute(): Unit = {
statementId = getHandle.getHandleIdentifier.getPublicId.toString
setState(OperationState.RUNNING)
try {
// Always use the latest class loader provided by executionHive's
state.
val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader
// Use parent session's SessionState in this operation because such
SessionState
// keeps some shared info per session e.g. authorization information.
SessionState.setCurrentSessionState(parentSession.getSessionState)
Thread.currentThread().setContextClassLoader(executionHiveClassLoader)
sqlContext.sparkContext.listenerBus.
post(StatementStart(statementId, System.currentTimeMillis(),
sqlContext.sparkContext.getLocalProperty("spark.hive.session.id")))
HiveThriftServer2.eventManager.onStatementStart(
statementId,
parentSession.getSessionHandle.getSessionId.toString,
downloadQuery,
statementId,
parentSession.getUsername)
assert(fetchSize >= 1L * 1024 * 1024 && fetchSize <= 20L * 1024 * 1024,
s"fetchBlockSize(${fetchSize}) should be greater than 1M and less
than 20M.")
if (StringUtils.isNotEmpty(tableName) &&
StringUtils.isNotEmpty(query)) {
throw new HiveSQLException("Both table name and query are
specified.")
}
sqlContext.sparkContext.setLocalProperty(
SparkContext.SPARK_USER_RESOURCE_CONSUMER_ID,
parentSession.getUserName)
if (parentSession.getUserInfo != null) {
sqlContext.sparkContext.setLocalProperty(
SparkContext.SPARK_USER_RESOURCE_CONSUMER_PROFILE,
parentSession.getUserInfo.profile)
}
sqlContext.sparkContext.setJobGroup(statementId, downloadQuery)
logInfo(s"Running query [$statementId] in session " +
s"[${parentSession.getSessionHandle.getSessionId.toString}] DOWNLOAD
'$query'")
val resultPath = writeData(new Path(pathPrefix, statementId))
logQueryInfo(s"Running query [$statementId] in session " +
s"[${parentSession.getSessionHandle.getSessionId.toString}] DOWNLOAD
'$query'")
val dataSize = fs.getContentSummary(resultPath).getLength
logInfo(s"Try to download ${dataSize} bytes data from thriftserver.")
totalDataSize = dataSize
val list: JList[DownloadDataBlock] = new
JArrayList[DownloadDataBlock]()
// Add total data size to first row.
list.add(DownloadDataBlock(schema = Some(schemaStr), dataSize =
dataSize))
// and then add data.
fs.listStatus(resultPath,
pathFilter).map(_.getPath).sortBy(_.getName).foreach { path =>
val dataLen = fs.getFileStatus(path).getLen
// Cast to BigDecimal to avoid overflowing
val fetchBatchs =
BigDecimal(dataLen)./(BigDecimal(fetchSize)).setScale(0,
RoundingMode.CEILING).longValue()
assert(fetchBatchs < Int.MaxValue, "The fetch batch too large.")
(0 until fetchBatchs.toInt).foreach { i =>
val fetchSizeInBatch = if (i == fetchBatchs - 1) dataLen - i *
fetchSize else fetchSize
list.add(DownloadDataBlock(
path = Some(path), offset = Some(i * fetchSize), dataSize =
fetchSizeInBatch))
}
list.add(DownloadDataBlock(path = Some(path), dataSize = -1))
}
iter = list.iterator()
logInfo(s"Add ${list.size()} data blocks to be fetched.")
setState(OperationState.FINISHED)
logQueryInfo(s"Finished query [$statementId].")
} catch {
case NonFatal(e) =>
logQueryError(s"Error executing query [$statementId]", e)
setState(OperationState.ERROR)
HiveThriftServer2.eventManager.onStatementError(
statementId, Utils.findFirstCause(e).toString,
Utils.exceptionString(e))
val exception = new HiveSQLException(e)
setOperationException(exception)
}
}
private def writeData(path: Path): Path = withRetry {
result = (Option(tableName), Option(query), Option(format),
Option(options)) match {
case (Some(t), None, _, _) =>
sqlContext.table(t)
case (None, Some(q), _, _) =>
sqlContext.sql(q)
case _ =>
throw new HiveSQLException(s"Invalid arguments: ($tableName, $query,
$format, $options).")
}
schemaStr = result.schema.map(_.name).mkString(writeOptions("delimiter"))
val needRepartition = result.queryExecution.sparkPlan match {
case _: SortExec => false
case _: TakeOrderedAndProjectExec => false
case ProjectExec(_, _: SortExec) => false
case AdaptiveSparkPlanExec(_: SortExec, _, _, _) => false
case AdaptiveSparkPlanExec(_: TakeOrderedAndProjectExec, _, _, _) =>
false
case AdaptiveSparkPlanExec(ProjectExec(_, _: SortExec), _, _, _) =>
false
case _: ShuffleExchangeExec => false
case ProjectExec(_, _: ShuffleExchangeExec) => false
case _: CollectLimitExec => false
case _: LimitExec => false
case _ => true
}
// Background: according to the official Hadoop FileSystem API spec,
// rename op's destination path must have a parent that exists,
// otherwise we may get unexpected result on the rename API.
// When downloading dataset as parquet format, if we configure a
// quota-free path and adopt FileOutputCommitter V1 algorithm, we will
// get the "IOException: Failed to rename FileStatus".
// Hence, the parent path should exist (see CARMEL-5150).
if (!fs.exists(path) && !fs.mkdirs(path)) {
logWarning(s"Failed to create parent download path ${path}")
}
val step1Path = new Path(path, "step1")
val outputFormat = Option(format).getOrElse("csv")
val (castCols, readSchema) = if (outputFormat.equalsIgnoreCase("csv")) {
// Support duplicate columns
val names = result.schema.map(_.name)
val renameDuplicateNames = if (names.distinct.length != names.length) {
val duplicateColumns = names.groupBy(identity).collect {
case (x, ys) if ys.length > 1 => x
}
result.logicalPlan.output.zipWithIndex.map {
case (col, index) if duplicateColumns.exists(_.equals(col.name)) =>
col.withName(col.name + index)
case (col, _) => col
}
} else {
result.logicalPlan.output
}
// Support Complex types for csv file
val output = renameDuplicateNames.map { col =>
col.dataType match {
case BinaryType => Column(col).cast(StringType).alias(col.name)
case NullType => Column(col).cast(StringType).alias(col.name)
case CalendarIntervalType =>
Column(col).cast(StringType).alias(col.name)
case ArrayType(_, _) =>
Column(col).cast(StringType).alias(col.name)
case MapType(_, _, _) =>
Column(col).cast(StringType).alias(col.name)
case StructType(_) => Column(col).cast(StringType).alias(col.name)
case _ => Column(col).alias(col.name)
}
}
(output, StructType(StructType.fromAttributes(renameDuplicateNames)
.map(_.copy(dataType = StringType))))
} else if (outputFormat.equalsIgnoreCase("parquet")) {
val output = result.logicalPlan.output.map { col =>
col.dataType match {
case BooleanType | ByteType | ShortType | IntegerType
| LongType | FloatType | DoubleType | BinaryType =>
Column(col).alias(col.name)
case _ => Column(col).cast(StringType).alias(col.name)
}
}
val newSchema = result.schema.map(s => s.dataType match {
case BooleanType | ByteType | ShortType | IntegerType
| LongType | FloatType | DoubleType | BinaryType => s
case _ => s.copy(dataType = StringType)
})
(output, StructType(newSchema))
} else {
val output = result.logicalPlan.output.map(col =>
Column(col).alias(col.name))
(output, result.schema)
}
val writePlan = if (!needRepartition) {
result.select(castCols: _*)
} else if (numFiles.nonEmpty) {
result.select(castCols: _*).repartition(numFiles.get)
} else {
result.select(castCols: _*).repartition(Column(Rand(1)))
}
writePlan.write
.options(writeOptions)
.format(outputFormat)
.mode(SaveMode.Overwrite)
.save(step1Path.toString)
val contentSummary = fs.getContentSummary(step1Path)
val dataSize = contentSummary.getLength
if (dataSize >
sqlContext.conf.getConf(HIVE_THRIFT_SERVER_DATA_DOWNLOAD_MAX_SIZE)) {
throw QueryLevelRestrictionErrors.downloadDataSizeExceeded(
dataSize,
sqlContext.conf.getConf(HIVE_THRIFT_SERVER_DATA_DOWNLOAD_MAX_SIZE))
}
step1Path
}
// Limit download speed.
private var lastFetchTime = System.currentTimeMillis()
private var downloadedDataSize = 0L
override def getNextRowSet(orientation: FetchOrientation, maxRowsL: Long):
RowSet = {
val expectedMaxTime = math.min(maxFetchBlockTime,
(downloadedDataSize.toDouble / (50L * 1024 * 1024)) * 1000L).toLong
val downloadTime = System.currentTimeMillis() - lastFetchTime
if (downloadTime < expectedMaxTime) {
logInfo(s"Limit download speed ${downloadTime}, " +
s"expected max download time ${expectedMaxTime}")
Thread.sleep(expectedMaxTime - downloadTime)
}
lastFetchTime = System.currentTimeMillis()
if (getStatus.getState ne OperationState.FINISHED) {
throw getStatus.getOperationException
}
assertState(OperationState.FINISHED)
validateFetchOrientation(orientation,
JEnumSet.of(FetchOrientation.FETCH_NEXT))
val rowSet: RowSet = RowSetFactory.create(getDownloadSchema,
getProtocolVersion, false)
if (!iter.hasNext) {
rowSet
} else {
val maxRows = maxRowsL.toInt
var curRow = 0
while (curRow < maxRows && iter.hasNext) {
val dataBlock = iter.next()
val dataSize = dataBlock.dataSize
dataBlock.path match {
case Some(path) =>
if (dataSize >= 0) {
val buffer: Array[Byte] = new Array[Byte](dataSize.toInt)
Utils.tryWithResource(fs.open(path)) { is =>
is.seek(dataBlock.offset.get)
is.readFully(buffer)
}
// data row
rowSet.addRow(Array[AnyRef](path.getName, buffer, null,
Long.box(dataSize)))
downloadedDataSize = dataSize
} else {
// End of file row
rowSet.addRow(Array[AnyRef](path.getName, null, null,
Long.box(dataSize)))
}
case _ =>
// Schema row and total data size row
rowSet.addRow(Array[AnyRef](null, null, dataBlock.schema.get,
Long.box(dataSize)))
}
curRow += 1
}
rowSet
}
}
override def getResultSetSchema: TableSchema = {
if (writeOptions.get("useRealSchema").nonEmpty
&& writeOptions("useRealSchema").equalsIgnoreCase("true")) {
resultSchema
} else {
val ret = new TableSchema()
.addPrimitiveColumn("FILE_NAME", Type.STRING_TYPE, "The file name to
be transferred.")
.addPrimitiveColumn("DATA", Type.BINARY_TYPE, "The data to be
transferred.")
.addPrimitiveColumn("SCHEMA", Type.STRING_TYPE, "The data schema to
be transferred.")
.addPrimitiveColumn("SIZE", Type.BIGINT_TYPE, "The size to be
transferred in this fetch.")
ret
}
}
private def getDownloadSchema: TableSchema = {
new TableSchema()
.addPrimitiveColumn("FILE_NAME", Type.STRING_TYPE, "The file name to
be transferred.")
.addPrimitiveColumn("DATA", Type.BINARY_TYPE, "The data to be
transferred.")
.addPrimitiveColumn("SCHEMA", Type.STRING_TYPE, "The data schema to be
transferred.")
.addPrimitiveColumn("SIZE", Type.BIGINT_TYPE, "The size to be
transferred in this fetch.")
}
override def cancel(): Unit = {
if (statementId != null) {
HiveThriftServer2.eventManager.onStatementCanceled(statementId)
}
HiveThriftServer2.eventManager.onQueryExist(
statementId,
QueryLogObjectList(Option(result).map(_.queryExecution)),
QueryLogExtInfo(false, totalDataSize))
cleanup(OperationState.CANCELED)
}
private def withRetry[T](f: => T): T = {
val maxRetry = 2
var retryNum = 0
def retriable(t: Throwable): Boolean = {
var cur = t
while (retryNum < maxRetry && cur != null) {
Utils.findFirstCause(cur) match {
case f: FileNotFoundException if
!f.getMessage.contains("shuffle_") =>
// For some commands, they may failed when initiating dataset,
since it will trigger
// execution on dataset initialization. We need manually build a
QueryExecution to
// get the optimized plan.
val qe = if (result != null) {
result.queryExecution
} else {
val parsed = sqlContext.sessionState.sqlParser.parsePlan(query)
new QueryExecution(sqlContext.sparkSession, parsed)
}
qe.optimizedPlan.foreach {
case LogicalRelation(_, _, Some(table), _) =>
qe.sparkSession.sessionState.refreshTable(table.identifier.toString)
case HiveTableRelation(tableMeta, _, _, _, _, _) =>
qe.sparkSession.sessionState.refreshTable(tableMeta.identifier.toString)
case _ =>
}
return true
case c => cur = cur.getCause()
}
}
false
}
var res: Option[T] = None
do {
if (retryNum > 0) {
logInfo(s"Start to retry query $statementId.")
}
try {
res = Some(f)
} catch {
case e if retriable(e) =>
logError(s"Query $statementId failed out of error
${e.getCause.getMessage}")
retryNum += 1
case e: Throwable =>
throw e
}
} while (res.isEmpty)
res.get
}
private def cleanup(state: OperationState) {
setState(state)
if (runInBackground) {
val backgroundHandle = getBackgroundHandle()
if (backgroundHandle != null) {
backgroundHandle.cancel(true)
}
}
if (statementId != null) {
sqlContext.sparkContext.cancelJobGroup(statementId, Some("Clean up
SparkDownloadData"))
sqlContext.queryLoadLimitationManager.clean(statementId)
}
// Delete temp files
try {
fs.delete(pathPrefix, true)
} catch {
case e: IOException =>
log.warn("Failed to remove download temp files.", e)
}
sqlContext.sparkContext.closeJobGroup(statementId)
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]