This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.1 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 5082817dab89bd5b9baceeef844341d81f662337 Author: Zouxxyy <[email protected]> AuthorDate: Mon Apr 21 21:05:29 2025 +0800 [spark] Support partitioned format table (#5502) --- .../java/org/apache/paimon/hive/HiveCatalog.java | 7 +- .../hive/Hive23CatalogFormatTableITCase.java | 6 + .../apache/paimon/spark/sql/FormatTableTest.scala | 21 +++ .../apache/paimon/spark/sql/FormatTableTest.scala | 21 +++ .../apache/paimon/spark/sql/FormatTableTest.scala | 21 +++ .../apache/paimon/spark/sql/FormatTableTest.scala | 21 +++ .../apache/paimon/spark/sql/FormatTableTest.scala | 21 +++ .../java/org/apache/paimon/spark/SparkCatalog.java | 32 ++-- .../spark/sql/execution/PaimonFormatTable.scala | 196 +++++++++++++++++++++ .../paimon/spark/sql/FormatTableTestBase.scala | 58 ++++++ 10 files changed, 391 insertions(+), 13 deletions(-) diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index 164b27db56..1ef319fb31 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -1467,7 +1467,12 @@ public class HiveCatalog extends AbstractCatalog { sd.setSerdeInfo(serDeInfo); CoreOptions options = new CoreOptions(schema.options()); - if (options.partitionedTableInMetastore() && !schema.partitionKeys().isEmpty()) { + boolean partitioned = !schema.partitionKeys().isEmpty(); + // Always treat partitioned format table as partitioned table in metastore + if (provider == null && !options.partitionedTableInMetastore()) { + partitioned = false; + } + if (partitioned) { Map<String, DataField> fieldMap = schema.fields().stream() .collect(Collectors.toMap(DataField::name, Function.identity())); diff --git a/paimon-hive/paimon-hive-connector-2.3/src/test/java/org/apache/paimon/hive/Hive23CatalogFormatTableITCase.java b/paimon-hive/paimon-hive-connector-2.3/src/test/java/org/apache/paimon/hive/Hive23CatalogFormatTableITCase.java index d2e277dd22..0e77a6b67e 100644 --- a/paimon-hive/paimon-hive-connector-2.3/src/test/java/org/apache/paimon/hive/Hive23CatalogFormatTableITCase.java +++ b/paimon-hive/paimon-hive-connector-2.3/src/test/java/org/apache/paimon/hive/Hive23CatalogFormatTableITCase.java @@ -49,4 +49,10 @@ public class Hive23CatalogFormatTableITCase extends HiveCatalogFormatTableITCase public void testPartitionTable() { // Need to specify partition columns because the destination table is partitioned. } + + @Override + @Test + public void testFlinkCreatePartitionTable() { + // Need to specify partition columns because the destination table is partitioned. + } } diff --git a/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/FormatTableTest.scala b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/FormatTableTest.scala new file mode 100644 index 0000000000..ba49976ab6 --- /dev/null +++ b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/FormatTableTest.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +class FormatTableTest extends FormatTableTestBase {} diff --git a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/FormatTableTest.scala b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/FormatTableTest.scala new file mode 100644 index 0000000000..ba49976ab6 --- /dev/null +++ b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/FormatTableTest.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +class FormatTableTest extends FormatTableTestBase {} diff --git a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/FormatTableTest.scala b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/FormatTableTest.scala new file mode 100644 index 0000000000..ba49976ab6 --- /dev/null +++ b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/FormatTableTest.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +class FormatTableTest extends FormatTableTestBase {} diff --git a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/FormatTableTest.scala b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/FormatTableTest.scala new file mode 100644 index 0000000000..ba49976ab6 --- /dev/null +++ b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/FormatTableTest.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +class FormatTableTest extends FormatTableTestBase {} diff --git a/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/FormatTableTest.scala b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/FormatTableTest.scala new file mode 100644 index 0000000000..ba49976ab6 --- /dev/null +++ b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/FormatTableTest.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +class FormatTableTest extends FormatTableTestBase {} diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java index f6e29d7651..c323d86dfd 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java @@ -31,6 +31,7 @@ import org.apache.paimon.spark.catalog.SupportFunction; import org.apache.paimon.spark.catalog.SupportView; import org.apache.paimon.table.FormatTable; import org.apache.paimon.table.FormatTableOptions; +import org.apache.paimon.utils.TypeUtils; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException; @@ -45,15 +46,15 @@ import org.apache.spark.sql.connector.expressions.FieldReference; import org.apache.spark.sql.connector.expressions.IdentityTransform; import org.apache.spark.sql.connector.expressions.NamedReference; import org.apache.spark.sql.connector.expressions.Transform; +import org.apache.spark.sql.execution.PartitionedCSVTable; +import org.apache.spark.sql.execution.PartitionedJsonTable; +import org.apache.spark.sql.execution.PartitionedOrcTable; +import org.apache.spark.sql.execution.PartitionedParquetTable; import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat; import org.apache.spark.sql.execution.datasources.json.JsonFileFormat; import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat; import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat; import org.apache.spark.sql.execution.datasources.v2.FileTable; -import org.apache.spark.sql.execution.datasources.v2.csv.CSVTable; -import org.apache.spark.sql.execution.datasources.v2.json.JsonTable; -import org.apache.spark.sql.execution.datasources.v2.orc.OrcTable; -import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.util.CaseInsensitiveStringMap; @@ -464,6 +465,9 @@ public class SparkCatalog extends SparkBaseCatalog implements SupportFunction, S private static FileTable convertToFileTable(Identifier ident, FormatTable formatTable) { StructType schema = SparkTypeUtils.fromPaimonRowType(formatTable.rowType()); + StructType partitionSchema = + SparkTypeUtils.fromPaimonRowType( + TypeUtils.project(formatTable.rowType(), formatTable.partitionKeys())); List<String> pathList = new ArrayList<>(); pathList.add(formatTable.location()); Options options = Options.fromMap(formatTable.options()); @@ -471,37 +475,41 @@ public class SparkCatalog extends SparkBaseCatalog implements SupportFunction, S if (formatTable.format() == FormatTable.Format.CSV) { options.set("sep", options.get(FormatTableOptions.FIELD_DELIMITER)); dsOptions = new CaseInsensitiveStringMap(options.toMap()); - return new CSVTable( + return new PartitionedCSVTable( ident.name(), SparkSession.active(), dsOptions, scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(), scala.Option.apply(schema), - CSVFileFormat.class); + CSVFileFormat.class, + partitionSchema); } else if (formatTable.format() == FormatTable.Format.ORC) { - return new OrcTable( + return new PartitionedOrcTable( ident.name(), SparkSession.active(), dsOptions, scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(), scala.Option.apply(schema), - OrcFileFormat.class); + OrcFileFormat.class, + partitionSchema); } else if (formatTable.format() == FormatTable.Format.PARQUET) { - return new ParquetTable( + return new PartitionedParquetTable( ident.name(), SparkSession.active(), dsOptions, scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(), scala.Option.apply(schema), - ParquetFileFormat.class); + ParquetFileFormat.class, + partitionSchema); } else if (formatTable.format() == FormatTable.Format.JSON) { - return new JsonTable( + return new PartitionedJsonTable( ident.name(), SparkSession.active(), dsOptions, scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(), scala.Option.apply(schema), - JsonFileFormat.class); + JsonFileFormat.class, + partitionSchema); } else { throw new UnsupportedOperationException( "Unsupported format table " diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonFormatTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonFormatTable.scala new file mode 100644 index 0000000000..842009b4a5 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonFormatTable.scala @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.datasources.v2.csv.CSVTable +import org.apache.spark.sql.execution.datasources.v2.json.JsonTable +import org.apache.spark.sql.execution.datasources.v2.orc.OrcTable +import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable +import org.apache.spark.sql.execution.streaming.{FileStreamSink, MetadataLogFileIndex} +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +import scala.collection.JavaConverters._ + +object PaimonFormatTable { + + // Copy from spark and override FileIndex's partitionSchema + def createFileIndex( + options: CaseInsensitiveStringMap, + sparkSession: SparkSession, + paths: Seq[String], + userSpecifiedSchema: Option[StructType], + partitionSchema: StructType): PartitioningAwareFileIndex = { + + def globPaths: Boolean = { + val entry = options.get(DataSource.GLOB_PATHS_KEY) + Option(entry).map(_ == "true").getOrElse(true) + } + + val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap + // Hadoop Configurations are case-sensitive. + val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap) + if (FileStreamSink.hasMetadata(paths, hadoopConf, sparkSession.sessionState.conf)) { + // We are reading from the results of a streaming query. We will load files from + // the metadata log instead of listing them using HDFS APIs. + new PartitionedMetadataLogFileIndex( + sparkSession, + new Path(paths.head), + options.asScala.toMap, + userSpecifiedSchema, + partitionSchema = partitionSchema) + } else { + // This is a non-streaming file based datasource. + val rootPathsSpecified = DataSource.checkAndGlobPathIfNecessary( + paths, + hadoopConf, + checkEmptyGlobPath = true, + checkFilesExist = true, + enableGlobbing = globPaths) + val fileStatusCache = FileStatusCache.getOrCreate(sparkSession) + + new PartitionedInMemoryFileIndex( + sparkSession, + rootPathsSpecified, + caseSensitiveMap, + userSpecifiedSchema, + fileStatusCache, + partitionSchema = partitionSchema) + } + } + + // Extend from MetadataLogFileIndex to override partitionSchema + private class PartitionedMetadataLogFileIndex( + sparkSession: SparkSession, + path: Path, + parameters: Map[String, String], + userSpecifiedSchema: Option[StructType], + override val partitionSchema: StructType) + extends MetadataLogFileIndex(sparkSession, path, parameters, userSpecifiedSchema) + + // Extend from InMemoryFileIndex to override partitionSchema + private class PartitionedInMemoryFileIndex( + sparkSession: SparkSession, + rootPathsSpecified: Seq[Path], + parameters: Map[String, String], + userSpecifiedSchema: Option[StructType], + fileStatusCache: FileStatusCache = NoopCache, + userSpecifiedPartitionSpec: Option[PartitionSpec] = None, + metadataOpsTimeNs: Option[Long] = None, + override val partitionSchema: StructType) + extends InMemoryFileIndex( + sparkSession, + rootPathsSpecified, + parameters, + userSpecifiedSchema, + fileStatusCache, + userSpecifiedPartitionSpec, + metadataOpsTimeNs) +} + +// Paimon Format Table + +class PartitionedCSVTable( + name: String, + sparkSession: SparkSession, + options: CaseInsensitiveStringMap, + paths: Seq[String], + userSpecifiedSchema: Option[StructType], + fallbackFileFormat: Class[_ <: FileFormat], + partitionSchema: StructType +) extends CSVTable(name, sparkSession, options, paths, userSpecifiedSchema, fallbackFileFormat) { + + override lazy val fileIndex: PartitioningAwareFileIndex = { + PaimonFormatTable.createFileIndex( + options, + sparkSession, + paths, + userSpecifiedSchema, + partitionSchema) + } +} + +class PartitionedOrcTable( + name: String, + sparkSession: SparkSession, + options: CaseInsensitiveStringMap, + paths: Seq[String], + userSpecifiedSchema: Option[StructType], + fallbackFileFormat: Class[_ <: FileFormat], + partitionSchema: StructType +) extends OrcTable(name, sparkSession, options, paths, userSpecifiedSchema, fallbackFileFormat) { + + override lazy val fileIndex: PartitioningAwareFileIndex = { + PaimonFormatTable.createFileIndex( + options, + sparkSession, + paths, + userSpecifiedSchema, + partitionSchema) + } +} + +class PartitionedParquetTable( + name: String, + sparkSession: SparkSession, + options: CaseInsensitiveStringMap, + paths: Seq[String], + userSpecifiedSchema: Option[StructType], + fallbackFileFormat: Class[_ <: FileFormat], + partitionSchema: StructType +) extends ParquetTable( + name, + sparkSession, + options, + paths, + userSpecifiedSchema, + fallbackFileFormat) { + + override lazy val fileIndex: PartitioningAwareFileIndex = { + PaimonFormatTable.createFileIndex( + options, + sparkSession, + paths, + userSpecifiedSchema, + partitionSchema) + } +} + +class PartitionedJsonTable( + name: String, + sparkSession: SparkSession, + options: CaseInsensitiveStringMap, + paths: Seq[String], + userSpecifiedSchema: Option[StructType], + fallbackFileFormat: Class[_ <: FileFormat], + partitionSchema: StructType) + extends JsonTable(name, sparkSession, options, paths, userSpecifiedSchema, fallbackFileFormat) { + + override lazy val fileIndex: PartitioningAwareFileIndex = { + PaimonFormatTable.createFileIndex( + options, + sparkSession, + paths, + userSpecifiedSchema, + partitionSchema) + } +} diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala new file mode 100644 index 0000000000..6bd54a26e3 --- /dev/null +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +import org.apache.paimon.catalog.Identifier +import org.apache.paimon.fs.Path +import org.apache.paimon.spark.PaimonHiveTestBase +import org.apache.paimon.table.FormatTable + +import org.apache.spark.sql.Row + +abstract class FormatTableTestBase extends PaimonHiveTestBase { + + override protected def beforeEach(): Unit = { + sql(s"USE $paimonHiveCatalogName") + } + + test("Format table: write partitioned table") { + for (format <- Seq("csv", "orc", "parquet", "json")) { + withTable("t") { + sql(s"CREATE TABLE t (id INT, p1 INT, p2 INT) USING $format PARTITIONED BY (p1, p2)") + sql("INSERT INTO t VALUES (1, 2, 3)") + + // check show create table + assert( + sql("SHOW CREATE TABLE t").collectAsList().toString.contains("PARTITIONED BY (p1, p2)")) + + // check partition in file system + val table = + paimonCatalog.getTable(Identifier.create("default", "t")).asInstanceOf[FormatTable] + val dirs = table.fileIO().listStatus(new Path(table.location())).map(_.getPath.getName) + assert(dirs.count(_.startsWith("p1=")) == 1) + + // check select + checkAnswer(sql("SELECT * FROM t"), Row(1, 2, 3)) + checkAnswer(sql("SELECT id FROM t"), Row(1)) + checkAnswer(sql("SELECT p1 FROM t"), Row(2)) + checkAnswer(sql("SELECT p2 FROM t"), Row(3)) + } + } + } +}
