[spark] branch master updated (f17f1d0 -> 02a0cde)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from f17f1d0 [SPARK-28778][MESOS] Fixed executors advertised address when running in virtual network add 02a0cde [SPARK-28723][SQL] Upgrade to Hive 2.3.6 for HiveMetastore Client and Hadoop-3.2 profile No new revisions were added by this update. Summary of changes: docs/building-spark.md | 4 ++-- docs/sql-data-sources-hive-tables.md | 2 +- docs/sql-migration-guide-hive-compatibility.md | 2 +- pom.xml | 2 +- .../spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala | 3 +-- .../sql/hive/thriftserver/HiveThriftServer2Suites.scala | 4 ++-- .../spark/sql/hive/thriftserver/ThriftserverShimUtils.scala | 7 +++ .../spark/sql/hive/thriftserver/ThriftserverShimUtils.scala | 12 .../src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala | 2 +- .../apache/spark/sql/hive/client/IsolatedClientLoader.scala | 2 +- .../scala/org/apache/spark/sql/hive/client/package.scala | 2 +- 11 files changed, 30 insertions(+), 12 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (573b1cb -> f17f1d0)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 573b1cb [SPARK-28858][ML][PYSPARK] add tree-based transformation in the py side add f17f1d0 [SPARK-28778][MESOS] Fixed executors advertised address when running in virtual network No new revisions were added by this update. Summary of changes: .../executor/CoarseGrainedExecutorBackend.scala| 8 ++- .../mesos/MesosCoarseGrainedSchedulerBackend.scala | 57 -- .../cluster/mesos/MesosSchedulerBackendUtil.scala | 17 ++- .../MesosCoarseGrainedSchedulerBackendSuite.scala | 24 + .../mesos/MesosSchedulerBackendUtilSuite.scala | 34 ++--- 5 files changed, 102 insertions(+), 38 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (d25cbd4 -> 573b1cb)
This is an automated email from the ASF dual-hosted git repository. cutlerb pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from d25cbd4 [SPARK-28839][CORE] Avoids NPE in context cleaner when dynamic allocation and shuffle service are on add 573b1cb [SPARK-28858][ML][PYSPARK] add tree-based transformation in the py side No new revisions were added by this update. Summary of changes: python/pyspark/ml/classification.py | 54 +--- python/pyspark/ml/regression.py | 71 ++--- 2 files changed, 84 insertions(+), 41 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (07c4b9b -> d25cbd4)
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 07c4b9b Revert "[SPARK-25474][SQL] Support `spark.sql.statistics.fallBackToHdfs` in data source tables" add d25cbd4 [SPARK-28839][CORE] Avoids NPE in context cleaner when dynamic allocation and shuffle service are on No new revisions were added by this update. Summary of changes: .../apache/spark/scheduler/dynalloc/ExecutorMonitor.scala | 9 ++--- .../spark/scheduler/dynalloc/ExecutorMonitorSuite.scala | 13 + 2 files changed, 19 insertions(+), 3 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: Revert "[SPARK-25474][SQL] Support `spark.sql.statistics.fallBackToHdfs` in data source tables"
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 07c4b9b Revert "[SPARK-25474][SQL] Support `spark.sql.statistics.fallBackToHdfs` in data source tables" 07c4b9b is described below commit 07c4b9bd1fb055f283af076b2a995db8f6efe7a5 Author: Xiao Li AuthorDate: Fri Aug 23 07:41:39 2019 -0700 Revert "[SPARK-25474][SQL] Support `spark.sql.statistics.fallBackToHdfs` in data source tables" This reverts commit 485ae6d1818e8756a86da38d6aefc8f1dbde49c2. Closes #25563 from gatorsmile/revert. Authored-by: Xiao Li Signed-off-by: Dongjoon Hyun --- .../spark/sql/execution/command/CommandUtils.scala | 11 -- .../execution/datasources/HadoopFsRelation.scala | 13 +++ .../org/apache/spark/sql/hive/HiveStrategies.scala | 14 ++-- .../apache/spark/sql/hive/StatisticsSuite.scala| 40 -- 4 files changed, 15 insertions(+), 63 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala index 9a9d66b..b644e6d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala @@ -344,15 +344,4 @@ object CommandUtils extends Logging { private def isDataPath(path: Path, stagingDir: String): Boolean = { !path.getName.startsWith(stagingDir) && DataSourceUtils.isDataPath(path) } - - def getSizeInBytesFallBackToHdfs(session: SparkSession, path: Path, defaultSize: Long): Long = { -try { - val hadoopConf = session.sessionState.newHadoopConf() - path.getFileSystem(hadoopConf).getContentSummary(path).getLength -} catch { - case NonFatal(e) => -logWarning(s"Failed to get table size from hdfs. Using the default size, $defaultSize.", e) -defaultSize -} - } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala index f7d2315..d278802 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala @@ -17,12 +17,13 @@ package org.apache.spark.sql.execution.datasources -import org.apache.hadoop.fs.Path +import java.util.Locale + +import scala.collection.mutable import org.apache.spark.sql.{SparkSession, SQLContext} import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.execution.FileRelation -import org.apache.spark.sql.execution.command.CommandUtils import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister} import org.apache.spark.sql.types.{StructField, StructType} @@ -70,13 +71,7 @@ case class HadoopFsRelation( override def sizeInBytes: Long = { val compressionFactor = sqlContext.conf.fileCompressionFactor -val defaultSize = (location.sizeInBytes * compressionFactor).toLong -location match { - case cfi: CatalogFileIndex if sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled => -CommandUtils.getSizeInBytesFallBackToHdfs(sparkSession, new Path(cfi.table.location), - defaultSize) - case _ => defaultSize -} +(location.sizeInBytes * compressionFactor).toLong } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index d09c0ab..7b28e4f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoTab ScriptTransformation} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.command.{CommandUtils, CreateTableCommand, DDLUtils} +import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils} import org.apache.spark.sql.execution.datasources.CreateTable import org.apache.spark.sql.hive.execution._ import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} @@ -118,8 +118,16 @@ class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] { if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty => val table = relation.tableMeta val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) { -CommandUtils.getSizeInBytesFallBackToHdfs(session, new Path(table.location), -
[spark] branch branch-2.4 updated (b913abd -> 0a5efc3)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git. from b913abd [SPARK-27330][SS][2.4] support task abort in foreach writer add 0a5efc3 [SPARK-28025][SS][2.4] Fix FileContextBasedCheckpointFileManager leaking crc files No new revisions were added by this update. Summary of changes: .../streaming/CheckpointFileManager.scala | 14 ++ .../streaming/CheckpointFileManagerSuite.scala | 16 .../execution/streaming/HDFSMetadataLogSuite.scala | 30 ++ 3 files changed, 55 insertions(+), 5 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated (0415d9d -> b913abd)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git. from 0415d9d [SPARK-28642][SQL][2.4] Hide credentials in show create table add b913abd [SPARK-27330][SS][2.4] support task abort in foreach writer No new revisions were added by this update. Summary of changes: .../streaming/sources/ForeachWriterProvider.scala | 17 +--- .../streaming/sources/ForeachWriterSuite.scala | 30 ++ 2 files changed, 44 insertions(+), 3 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (1472e66 -> 8258660)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 1472e66 [SPARK-28716][SQL] Add id to Exchange and Subquery's stringArgs method for easier identifying their reuses in query plans add 8258660 [SPARK-28741][SQL] Optional mode: throw exceptions when casting to integers causes overflow No new revisions were added by this update. Summary of changes: .../spark/sql/catalyst/expressions/Cast.scala | 173 +++-- .../sql/catalyst/expressions/arithmetic.scala | 4 +- .../org/apache/spark/sql/internal/SQLConf.scala| 8 +- .../scala/org/apache/spark/sql/types/Decimal.scala | 88 +++ .../org/apache/spark/sql/types/DecimalType.scala | 2 + .../org/apache/spark/sql/types/DoubleType.scala| 2 + .../org/apache/spark/sql/types/FloatType.scala | 2 + .../org/apache/spark/sql/types/numerics.scala | 75 - .../expressions/ArithmeticExpressionSuite.scala| 24 +-- .../spark/sql/catalyst/expressions/CastSuite.scala | 111 + 10 files changed, 454 insertions(+), 35 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-28716][SQL] Add id to Exchange and Subquery's stringArgs method for easier identifying their reuses in query plans
This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 1472e66 [SPARK-28716][SQL] Add id to Exchange and Subquery's stringArgs method for easier identifying their reuses in query plans 1472e66 is described below commit 1472e664baf74d60d88d8509cfd7fc3d8c48b2bf Author: Ali Afroozeh AuthorDate: Fri Aug 23 13:29:32 2019 +0200 [SPARK-28716][SQL] Add id to Exchange and Subquery's stringArgs method for easier identifying their reuses in query plans ## What changes were proposed in this pull request? Add id to Exchange and Subquery's stringArgs method for easier identifying their reuses in query plans, for example: ``` ReusedExchange d_date_sk#827, BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))) [id=#2710] ``` Where `2710` is the id of the reused exchange. ## How was this patch tested? Passes existing tests Closes #25434 from dbaliafroozeh/ImplementStringArgsExchangeSubqueryExec. Authored-by: Ali Afroozeh Signed-off-by: herman --- .../src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala | 8 .../org/apache/spark/sql/execution/basicPhysicalOperators.scala | 2 ++ .../scala/org/apache/spark/sql/execution/exchange/Exchange.scala | 2 ++ .../org/apache/spark/sql/execution/debug/DebuggingSuite.scala | 4 ++-- 4 files changed, 14 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 2baf2e5..ba89ba7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream} +import java.util.concurrent.atomic.AtomicInteger import scala.collection.mutable.ArrayBuffer @@ -46,6 +47,11 @@ object SparkPlan { /** The [[LogicalPlan]] inherited from its ancestor. */ val LOGICAL_PLAN_INHERITED_TAG = TreeNodeTag[LogicalPlan]("logical_plan_inherited") + + private val nextPlanId = new AtomicInteger(0) + + /** Register a new SparkPlan, returning its SparkPlan ID */ + private[execution] def newPlanId(): Int = nextPlanId.getAndIncrement() } /** @@ -64,6 +70,8 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ protected def sparkContext = sqlContext.sparkContext + val id: Int = SparkPlan.newPlanId() + // sqlContext will be null when SparkPlan nodes are created without the active sessions. val subexpressionEliminationEnabled: Boolean = if (sqlContext != null) { sqlContext.conf.subexpressionEliminationEnabled diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index 7204548..b74dd95 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -731,6 +731,8 @@ case class SubqueryExec(name: String, child: SparkPlan) override def executeCollect(): Array[InternalRow] = { ThreadUtils.awaitResult(relationFuture, Duration.Inf) } + + override def stringArgs: Iterator[Any] = super.stringArgs ++ Iterator(s"[id=#$id]") } object SubqueryExec { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala index ea0778b..153645d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala @@ -40,6 +40,8 @@ import org.apache.spark.sql.vectorized.ColumnarBatch */ abstract class Exchange extends UnaryExecNode { override def output: Seq[Attribute] = child.output + + override def stringArgs: Iterator[Any] = super.stringArgs ++ Iterator(s"[id=#$id]") } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala index aaf1fe4..7a8da7e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala @@ -61,8 +61,8 @@ class DebuggingSuite extends SharedSparkSession { } val output = captured.toString() -assert(output.contains( - """== BroadcastExchange
[spark] branch master updated (1fd7f29 -> aef7ca1)
This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 1fd7f29 [SPARK-28857][INFRA] Clean up the comments of PR template during merging add aef7ca1 [SPARK-28836][SQL] Remove the canonicalize(attributes) method from PlanExpression No new revisions were added by this update. Summary of changes: .../spark/sql/catalyst/expressions/subquery.scala | 10 -- .../apache/spark/sql/catalyst/plans/QueryPlan.scala| 18 -- .../spark/sql/execution/DataSourceScanExec.scala | 4 ++-- .../sql/execution/columnar/InMemoryRelation.scala | 2 +- .../sql/execution/columnar/InMemoryTableScanExec.scala | 4 ++-- .../sql/execution/datasources/LogicalRelation.scala| 2 +- .../sql/execution/datasources/v2/BatchScanExec.scala | 2 +- .../org/apache/spark/sql/execution/subquery.scala | 5 - .../spark/sql/hive/execution/HiveTableScanExec.scala | 2 +- 9 files changed, 20 insertions(+), 29 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (98e1a4c -> 1fd7f29)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 98e1a4c [SPARK-28319][SQL] Implement SHOW TABLES for Data Source V2 Tables add 1fd7f29 [SPARK-28857][INFRA] Clean up the comments of PR template during merging No new revisions were added by this update. Summary of changes: dev/merge_spark_pr.py | 19 ++- 1 file changed, 18 insertions(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch branch-2.4 updated (e468576 -> 0415d9d)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git. from e468576 [SPARK-28699][CORE][2.4] Fix a corner case for aborting indeterminate stage add 0415d9d [SPARK-28642][SQL][2.4] Hide credentials in show create table No new revisions were added by this update. Summary of changes: .../spark/sql/execution/command/tables.scala | 2 +- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 29 +- 2 files changed, 29 insertions(+), 2 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated (9976b87 -> 98e1a4c)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 9976b87 [SPARK-28835][SQL][TEST] Add TPCDSSchema trait add 98e1a4c [SPARK-28319][SQL] Implement SHOW TABLES for Data Source V2 Tables No new revisions were added by this update. Summary of changes: .../apache/spark/sql/catalyst/parser/SqlBase.g4| 2 +- .../spark/sql/catalog/v2/LookupCatalog.scala | 18 .../spark/sql/catalyst/parser/AstBuilder.scala | 11 +- .../plans/logical/basicLogicalOperators.scala | 11 ++ .../plans/logical/sql/ShowTablesStatement.scala| 8 +- .../spark/sql/catalyst/parser/DDLParserSuite.scala | 17 +++- .../spark/sql/execution/SparkSqlParser.scala | 15 --- .../datasources/DataSourceResolution.scala | 29 +- .../datasources/v2/DataSourceV2Strategy.scala | 5 +- .../{AlterTableExec.scala => ShowTablesExec.scala} | 43 +--- .../sql/sources/v2/DataSourceV2SQLSuite.scala | 112 - .../sql/sources/v2/TestInMemoryTableCatalog.scala | 72 - 12 files changed, 296 insertions(+), 47 deletions(-) copy core/src/main/scala/org/apache/spark/executor/package.scala => sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/ShowTablesStatement.scala (78%) copy sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/{AlterTableExec.scala => ShowTablesExec.scala} (51%) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-28835][SQL][TEST] Add TPCDSSchema trait
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 9976b87 [SPARK-28835][SQL][TEST] Add TPCDSSchema trait 9976b87 is described below commit 9976b876f1802a05b7df9b9eb9fb44e2ff76a1ec Author: Ali Afroozeh AuthorDate: Thu Aug 22 23:18:46 2019 -0700 [SPARK-28835][SQL][TEST] Add TPCDSSchema trait ### What changes were proposed in this pull request? This PR extracts the schema information of TPCDS tables into a separate class called `TPCDSSchema` which can be reused for other testing purposes ### How was this patch tested? This PR is only a refactoring for tests and passes existing tests Closes #25535 from dbaliafroozeh/IntroduceTPCDSSchema. Authored-by: Ali Afroozeh Signed-off-by: Dongjoon Hyun --- .../org/apache/spark/sql/TPCDSQuerySuite.scala | 290 +--- .../{TPCDSQuerySuite.scala => TPCDSSchema.scala} | 487 - 2 files changed, 189 insertions(+), 588 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TPCDSQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/TPCDSQuerySuite.scala index 817224d..a668434 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/TPCDSQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/TPCDSQuerySuite.scala @@ -24,295 +24,13 @@ import org.apache.spark.sql.internal.SQLConf * This test suite ensures all the TPC-DS queries can be successfully analyzed, optimized * and compiled without hitting the max iteration threshold. */ -class TPCDSQuerySuite extends BenchmarkQueryTest { +class TPCDSQuerySuite extends BenchmarkQueryTest with TPCDSSchema { override def beforeAll() { super.beforeAll() - -sql( - """ -|CREATE TABLE `catalog_page` ( -|`cp_catalog_page_sk` INT, `cp_catalog_page_id` STRING, `cp_start_date_sk` INT, -|`cp_end_date_sk` INT, `cp_department` STRING, `cp_catalog_number` INT, -|`cp_catalog_page_number` INT, `cp_description` STRING, `cp_type` STRING) -|USING parquet - """.stripMargin) - -sql( - """ -|CREATE TABLE `catalog_returns` ( -|`cr_returned_date_sk` INT, `cr_returned_time_sk` INT, `cr_item_sk` INT, -|`cr_refunded_customer_sk` INT, `cr_refunded_cdemo_sk` INT, `cr_refunded_hdemo_sk` INT, -|`cr_refunded_addr_sk` INT, `cr_returning_customer_sk` INT, `cr_returning_cdemo_sk` INT, -|`cr_returning_hdemo_sk` INT, `cr_returning_addr_sk` INT, `cr_call_center_sk` INT, -|`cr_catalog_page_sk` INT, `cr_ship_mode_sk` INT, `cr_warehouse_sk` INT, `cr_reason_sk` INT, -|`cr_order_number` INT, `cr_return_quantity` INT, `cr_return_amount` DECIMAL(7,2), -|`cr_return_tax` DECIMAL(7,2), `cr_return_amt_inc_tax` DECIMAL(7,2), `cr_fee` DECIMAL(7,2), -|`cr_return_ship_cost` DECIMAL(7,2), `cr_refunded_cash` DECIMAL(7,2), -|`cr_reversed_charge` DECIMAL(7,2), `cr_store_credit` DECIMAL(7,2), -|`cr_net_loss` DECIMAL(7,2)) -|USING parquet - """.stripMargin) - -sql( - """ -|CREATE TABLE `customer` ( -|`c_customer_sk` INT, `c_customer_id` STRING, `c_current_cdemo_sk` INT, -|`c_current_hdemo_sk` INT, `c_current_addr_sk` INT, `c_first_shipto_date_sk` INT, -|`c_first_sales_date_sk` INT, `c_salutation` STRING, `c_first_name` STRING, -|`c_last_name` STRING, `c_preferred_cust_flag` STRING, `c_birth_day` INT, -|`c_birth_month` INT, `c_birth_year` INT, `c_birth_country` STRING, `c_login` STRING, -|`c_email_address` STRING, `c_last_review_date` INT) -|USING parquet - """.stripMargin) - -sql( - """ -|CREATE TABLE `customer_address` ( -|`ca_address_sk` INT, `ca_address_id` STRING, `ca_street_number` STRING, -|`ca_street_name` STRING, `ca_street_type` STRING, `ca_suite_number` STRING, -|`ca_city` STRING, `ca_county` STRING, `ca_state` STRING, `ca_zip` STRING, -|`ca_country` STRING, `ca_gmt_offset` DECIMAL(5,2), `ca_location_type` STRING) -|USING parquet - """.stripMargin) - -sql( - """ -|CREATE TABLE `customer_demographics` ( -|`cd_demo_sk` INT, `cd_gender` STRING, `cd_marital_status` STRING, -|`cd_education_status` STRING, `cd_purchase_estimate` INT, `cd_credit_rating` STRING, -|`cd_dep_count` INT, `cd_dep_employed_count` INT, `cd_dep_college_count` INT) -|USING parquet - """.stripMargin) - -sql( - """ -|CREATE TABLE `date_dim` ( -|`d_date_sk` INT, `d_date_id` STRING, `d_date` DATE, -|`d_month_seq` INT, `d_week_seq` INT, `d_quarter_seq` INT, `d_year` INT, `d_dow` INT, -|`d_moy` INT, `d_dom` INT, `d_qoy` INT, `d_fy_year` INT, `d_fy_quarter_seq` INT, -
[spark] branch master updated: [SPARK-28025][SS] Fix FileContextBasedCheckpointFileManager leaking crc files
This is an automated email from the ASF dual-hosted git repository. zsxwing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 406c533 [SPARK-28025][SS] Fix FileContextBasedCheckpointFileManager leaking crc files 406c533 is described below commit 406c5331ff8937120af465219c8f443ee00a97fb Author: Jungtaek Lim (HeartSaVioR) AuthorDate: Thu Aug 22 23:10:16 2019 -0700 [SPARK-28025][SS] Fix FileContextBasedCheckpointFileManager leaking crc files ### What changes were proposed in this pull request? This PR fixes the leak of crc files from CheckpointFileManager when FileContextBasedCheckpointFileManager is being used. Spark hits the Hadoop bug, [HADOOP-16255](https://issues.apache.org/jira/browse/HADOOP-16255) which seems to be a long-standing issue. This is there're two `renameInternal` methods: ``` public void renameInternal(Path src, Path dst) public void renameInternal(final Path src, final Path dst, boolean overwrite) ``` which should be overridden to handle all cases but ChecksumFs only overrides method with 2 params, so when latter is called FilterFs.renameInternal(...) is called instead, and it will do rename with RawLocalFs as underlying filesystem. The bug is related to FileContext, so FileSystemBasedCheckpointFileManager is not affected. [SPARK-17475](https://issues.apache.org/jira/browse/SPARK-17475) took a workaround for this bug, but [SPARK-23966](https://issues.apache.org/jira/browse/SPARK-23966) seemed to bring regression. This PR deletes crc file as "best-effort" when renaming, as failing to delete crc file is not that critical to fail the task. ### Why are the changes needed? This PR prevents crc files not being cleaned up even purging batches. Too many files in same directory often hurts performance, as well as each crc file occupies more space than its own size so possible to occupy nontrivial amount of space when batches go up to 10+. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Some unit tests are modified to check leakage of crc files. Closes #25488 from HeartSaVioR/SPARK-28025. Authored-by: Jungtaek Lim (HeartSaVioR) Signed-off-by: Shixiong Zhu --- .../streaming/CheckpointFileManager.scala | 14 ++ .../streaming/CheckpointFileManagerSuite.scala | 16 .../execution/streaming/HDFSMetadataLogSuite.scala | 30 ++ 3 files changed, 55 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala index fe6362d..26f42b6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala @@ -327,6 +327,8 @@ class FileContextBasedCheckpointFileManager(path: Path, hadoopConf: Configuratio override def renameTempFile(srcPath: Path, dstPath: Path, overwriteIfPossible: Boolean): Unit = { import Options.Rename._ fc.rename(srcPath, dstPath, if (overwriteIfPossible) OVERWRITE else NONE) +// TODO: this is a workaround of HADOOP-16255 - remove this when HADOOP-16255 is resolved +mayRemoveCrcFile(srcPath) } @@ -343,5 +345,17 @@ class FileContextBasedCheckpointFileManager(path: Path, hadoopConf: Configuratio case _: LocalFs | _: RawLocalFs => true // LocalFs = RawLocalFs + ChecksumFs case _ => false } + + private def mayRemoveCrcFile(path: Path): Unit = { +try { + val checksumFile = new Path(path.getParent, s".${path.getName}.crc") + if (exists(checksumFile)) { +// checksum file exists, deleting it +delete(checksumFile) + } +} catch { + case NonFatal(_) => // ignore, we are removing crc file as "best-effort" +} + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala index c57b40c..79bcd49 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManagerSuite.scala @@ -78,6 +78,22 @@ abstract class CheckpointFileManagerTests extends SparkFunSuite with SQLHelper { assert(fm.exists(path)) fm.createAtomic(path, overwriteIfPossible = true).close() // should not throw exception + // crc file should not be leaked when origin file doesn't exist. + // The implementation of