aokolnychyi commented on a change in pull request #624: Update SparkTableUtil
to use SessionCatalog and proper MetricsConfig
URL: https://github.com/apache/incubator-iceberg/pull/624#discussion_r344237820
##########
File path: spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
##########
@@ -352,54 +428,93 @@ object SparkTableUtil {
}
/**
- * Import a spark table to a iceberg table.
+ * Import files from an existing Spark table to an Iceberg table.
*
- * The import uses the spark session to get table metadata. It assumes no
- * operation is going on original table and target table and thus is not
+ * The import uses the Spark session to get table metadata. It assumes no
+ * operation is going on the original and target table and thus is not
* thread-safe.
*
- * @param source the database name of the table to be import
- * @param stagingDir the staging directory to store temporary manifest file
- * @param table the target table to import
+ * @param spark a Spark session
+ * @param sourceTableIdent an identifier of the source Spark table
+ * @param targetTable an Iceberg table where to import the data
+ * @param stagingDir a staging directory to store temporary manifest files
*/
def importSparkTable(
- source: TableIdentifier,
- stagingDir: String,
- table: Table): Unit = {
- val sparkSession = SparkSession.builder().getOrCreate()
- import sparkSession.sqlContext.implicits._
+ spark: SparkSession,
+ sourceTableIdent: TableIdentifier,
+ targetTable: Table,
+ stagingDir: String): Unit = {
- val dbName = source.database.getOrElse("default")
- val tableName = source.table
+ val catalog = spark.sessionState.catalog
- if (!sparkSession.catalog.tableExists(dbName, tableName)) {
- throw new NoSuchTableException(s"Table $dbName.$tableName does not
exist")
+ val db = sourceTableIdent.database.getOrElse(catalog.getCurrentDatabase)
+ val sourceTableIdentWithDB = sourceTableIdent.copy(database = Some(db))
+
+ if (!catalog.tableExists(sourceTableIdentWithDB)) {
+ throw new NoSuchTableException(s"Table $sourceTableIdentWithDB does not
exist")
}
- val partitionSpec = SparkSchemaUtil.specForTable(sparkSession,
s"$dbName.$tableName")
- val conf = sparkSession.sparkContext.hadoopConfiguration
- val serializableConfiguration = new SerializableConfiguration(conf)
- val appender = table.newAppend()
+ val spec = SparkSchemaUtil.specForTable(spark,
sourceTableIdentWithDB.unquotedString)
- if (partitionSpec == PartitionSpec.unpartitioned) {
- val catalogTable =
sparkSession.sessionState.catalog.getTableMetadata(source)
- val files = listPartition(Map.empty[String, String],
catalogTable.location.toString,
- catalogTable.storage.serde.getOrElse("none"))
- files.foreach{f =>
appender.appendFile(f.toDataFile(PartitionSpec.unpartitioned))}
+ if (spec == PartitionSpec.unpartitioned) {
+ importUnpartitionedSparkTable(spark, sourceTableIdentWithDB, targetTable)
} else {
- val partitions = partitionDF(sparkSession, s"$dbName.$tableName")
- val manifests = partitions.flatMap { row =>
- listPartition(row.getMap[String, String](0).toMap, row.getString(1),
row.getString(2))
- }.repartition(sparkSession.sessionState.conf.numShufflePartitions)
- .orderBy($"path")
- .mapPartitions {
- files => buildManifest(serializableConfiguration, files.toSeq,
partitionSpec, stagingDir)
- }.collect().map(_.toManifestFile)
- manifests.foreach(appender.appendManifest)
+ importPartitionedSparkTable(spark, sourceTableIdentWithDB, targetTable,
spec, stagingDir)
}
+ }
+
+ private def importUnpartitionedSparkTable(
+ spark: SparkSession,
+ sourceTableIdent: TableIdentifier,
+ targetTable: Table): Unit = {
+
+ val sourceTable =
spark.sessionState.catalog.getTableMetadata(sourceTableIdent)
+ val format = sourceTable.storage.serde.orElse(sourceTable.provider)
+ require(format.nonEmpty, "Could not determine table format")
+
+ val conf = spark.sparkContext.hadoopConfiguration
+ val metricsConfig = MetricsConfig.fromProperties(targetTable.properties)
+
+ val files = listPartition(Map.empty, sourceTable.location.toString,
format.get, conf, metricsConfig)
- appender.commit()
+ val append = targetTable.newAppend()
+ files.foreach(file =>
append.appendFile(file.toDataFile(PartitionSpec.unpartitioned)))
+ append.commit()
}
-}
+ private def importPartitionedSparkTable(
+ spark: SparkSession,
+ sourceTableIdent: TableIdentifier,
+ targetTable: Table,
+ spec: PartitionSpec,
+ stagingDir: String): Unit = {
+
+ import spark.implicits._
+ val conf = spark.sparkContext.hadoopConfiguration
+ val serializableConf = new SerializableConfiguration(conf)
+ val partitions = getPartitions(spark, sourceTableIdent)
+ val parallelism = Math.min(partitions.size,
spark.sessionState.conf.parallelPartitionDiscoveryParallelism)
+ val partitionDS = spark.sparkContext.parallelize(partitions,
parallelism).toDS()
Review comment:
I think it is important to parallelize the listing operation.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]