This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 1c5ff5ae6a [core] format table: update format-table.implementation
default value from engine to paimon (#6474)
1c5ff5ae6a is described below
commit 1c5ff5ae6a0c55e80f79f2680258242ea62c48ab
Author: jerry <[email protected]>
AuthorDate: Tue Oct 28 13:58:53 2025 +0800
[core] format table: update format-table.implementation default value from
engine to paimon (#6474)
---
.../shortcodes/generated/core_configuration.html | 2 +-
.../main/java/org/apache/paimon/CoreOptions.java | 2 +-
.../format/parquet/ParquetWriterFactory.java | 4 +++
.../paimon/spark/format/PaimonFormatTable.scala | 14 ++++++++-
.../paimon/spark/SparkCatalogWithHiveTest.java | 13 +++++----
.../paimon/spark/sql/FormatTableTestBase.scala | 34 +++++++++++++++++-----
6 files changed, 53 insertions(+), 16 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 1889c9461c..6b3a8fbfb5 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -538,7 +538,7 @@ under the License.
</tr>
<tr>
<td><h5>format-table.implementation</h5></td>
- <td style="word-wrap: break-word;">engine</td>
+ <td style="word-wrap: break-word;">paimon</td>
<td><p>Enum</p></td>
<td>Format table uses paimon or engine.<br /><br />Possible
values:<ul><li>"paimon": Paimon format table implementation.</li><li>"engine":
Engine format table implementation.</li></ul></td>
</tr>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 26af349006..267d1255a3 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1998,7 +1998,7 @@ public class CoreOptions implements Serializable {
public static final ConfigOption<FormatTableImplementation>
FORMAT_TABLE_IMPLEMENTATION =
key("format-table.implementation")
.enumType(FormatTableImplementation.class)
- .defaultValue(FormatTableImplementation.ENGINE)
+ .defaultValue(FormatTableImplementation.PAIMON)
.withDescription("Format table uses paimon or engine.");
public static final ConfigOption<Boolean>
FORMAT_TABLE_PARTITION_ONLY_VALUE_IN_PATH =
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetWriterFactory.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetWriterFactory.java
index e91f9096ea..006c2121aa 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetWriterFactory.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetWriterFactory.java
@@ -21,6 +21,7 @@ package org.apache.paimon.format.parquet;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.FormatWriter;
import org.apache.paimon.format.FormatWriterFactory;
+import org.apache.paimon.format.HadoopCompressionType;
import org.apache.paimon.format.parquet.writer.ParquetBuilder;
import org.apache.paimon.format.parquet.writer.ParquetBulkWriter;
import org.apache.paimon.format.parquet.writer.StreamOutputFile;
@@ -49,6 +50,9 @@ public class ParquetWriterFactory implements
FormatWriterFactory {
@Override
public FormatWriter create(PositionOutputStream stream, String
compression) throws IOException {
final OutputFile out = new StreamOutputFile(stream);
+ if (HadoopCompressionType.NONE.value().equals(compression)) {
+ compression = null;
+ }
final ParquetWriter<InternalRow> writer =
writerBuilder.createWriter(out, compression);
return new ParquetBulkWriter(writer);
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala
index 3685e26ee6..421dc6206c 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala
@@ -18,6 +18,7 @@
package org.apache.paimon.spark.format
+import org.apache.paimon.format.csv.CsvOptions
import org.apache.paimon.fs.TwoPhaseOutputStream
import org.apache.paimon.spark.{BaseTable, FormatTableScanBuilder,
SparkInternalRowWrapper}
import org.apache.paimon.spark.write.BaseV2WriteBuilder
@@ -28,7 +29,7 @@ import org.apache.paimon.types.RowType
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite,
TableCapability}
+import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite,
TableCapability, TableCatalog}
import org.apache.spark.sql.connector.catalog.TableCapability.{BATCH_READ,
BATCH_WRITE, OVERWRITE_BY_FILTER, OVERWRITE_DYNAMIC}
import org.apache.spark.sql.connector.read.ScanBuilder
import org.apache.spark.sql.connector.write._
@@ -50,6 +51,17 @@ case class PaimonFormatTable(table: FormatTable)
util.EnumSet.of(BATCH_READ, BATCH_WRITE, OVERWRITE_DYNAMIC,
OVERWRITE_BY_FILTER)
}
+ override def properties: util.Map[String, String] = {
+ val properties = new util.HashMap[String, String](table.options())
+ if (table.comment.isPresent) {
+ properties.put(TableCatalog.PROP_COMMENT, table.comment.get)
+ }
+ if (FormatTable.Format.CSV == table.format) {
+ properties.put("sep", properties.get(CsvOptions.FIELD_DELIMITER.key()))
+ }
+ properties
+ }
+
override def newScanBuilder(caseInsensitiveStringMap:
CaseInsensitiveStringMap): ScanBuilder = {
val scanBuilder =
FormatTableScanBuilder(table.copy(caseInsensitiveStringMap))
scanBuilder.pruneColumns(schema)
diff --git
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java
index 980944b828..bcd69cdeb7 100644
---
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java
+++
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java
@@ -78,7 +78,7 @@ public class SparkCatalogWithHiveTest {
.stream()
.map(s -> s.get(0))
.map(Object::toString)
- .filter(s -> s.contains("OrcScan"))
+ .filter(s ->
s.contains("PaimonFormatTableScan"))
.count())
.isGreaterThan(0);
@@ -89,10 +89,11 @@ public class SparkCatalogWithHiveTest {
// test csv table
spark.sql(
- "CREATE TABLE IF NOT EXISTS table_csv (a INT, bb INT, c
STRING) USING csv OPTIONS ('csv.field-delimiter' ',')");
+ "CREATE TABLE IF NOT EXISTS table_csv (a INT, bb INT, c
STRING) USING csv OPTIONS ('csv.field-delimiter' ',') "
+ + "TBLPROPERTIES ('file.compression'='none')");
spark.sql("INSERT INTO table_csv VALUES (1, 1, '1'), (2, 2,
'2')").collect();
- assertThat(spark.sql("DESCRIBE FORMATTED
table_csv").collectAsList().toString())
- .contains("sep=,");
+ String r = spark.sql("DESCRIBE FORMATTED
table_csv").collectAsList().toString();
+ assertThat(r).contains("sep=,");
assertThat(
spark.sql("SELECT * FROM
table_csv").collectAsList().stream()
.map(Row::toString)
@@ -101,7 +102,9 @@ public class SparkCatalogWithHiveTest {
// test json table
- spark.sql("CREATE TABLE IF NOT EXISTS table_json (a INT, bb INT, c
STRING) USING json");
+ spark.sql(
+ "CREATE TABLE IF NOT EXISTS table_json (a INT, bb INT, c
STRING) USING json "
+ + "TBLPROPERTIES ('file.compression'='none')");
spark.sql("INSERT INTO table_json VALUES(1, 1, '1'), (2, 2, '2')");
assertThat(
spark.sql("SELECT * FROM
table_json").collectAsList().stream()
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala
index 02dd3a2444..cb469f4da0 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala
@@ -35,7 +35,8 @@ abstract class FormatTableTestBase extends PaimonHiveTestBase
{
test("Format table: csv with field-delimiter") {
withTable("t") {
- sql(s"CREATE TABLE t (f0 INT, f1 INT) USING CSV OPTIONS
('csv.field-delimiter' ';')")
+ sql(
+ s"CREATE TABLE t (f0 INT, f1 INT) USING CSV OPTIONS
('csv.field-delimiter' ';') TBLPROPERTIES ('file.compression'='none')")
val table =
paimonCatalog.getTable(Identifier.create(hiveDbName,
"t")).asInstanceOf[FormatTable]
val csvFile =
@@ -46,9 +47,16 @@ abstract class FormatTableTestBase extends
PaimonHiveTestBase {
}
test("Format table: write partitioned table") {
- for (format <- Seq("csv", "orc", "parquet", "json")) {
+ for (
+ (format, compression) <- Seq(
+ ("csv", "gzip"),
+ ("orc", "zlib"),
+ ("parquet", "zstd"),
+ ("json", "none"))
+ ) {
withTable("t") {
- sql(s"CREATE TABLE t (id INT, p1 INT, p2 INT) USING $format
PARTITIONED BY (p1, p2)")
+ sql(
+ s"CREATE TABLE t (id INT, p1 INT, p2 INT) USING $format PARTITIONED
BY (p1, p2) TBLPROPERTIES ('file.compression'='$compression')")
sql("INSERT INTO t VALUES (1, 2, 3)")
// check show create table
@@ -71,9 +79,16 @@ abstract class FormatTableTestBase extends
PaimonHiveTestBase {
}
test("Format table: show partitions") {
- for (format <- Seq("csv", "orc", "parquet", "json")) {
+ for (
+ (format, compression) <- Seq(
+ ("csv", "gzip"),
+ ("orc", "zlib"),
+ ("parquet", "zstd"),
+ ("json", "none"))
+ ) {
withTable("t") {
- sql(s"CREATE TABLE t (id INT, p1 INT, p2 STRING) USING $format
PARTITIONED BY (p1, p2)")
+ sql(
+ s"CREATE TABLE t (id INT, p1 INT, p2 STRING) USING $format
PARTITIONED BY (p1, p2) TBLPROPERTIES ('file.compression'='$compression')")
sql("INSERT INTO t VALUES (1, 1, '1')")
sql("INSERT INTO t VALUES (2, 1, '1')")
sql("INSERT INTO t VALUES (3, 2, '1')")
@@ -92,7 +107,8 @@ abstract class FormatTableTestBase extends
PaimonHiveTestBase {
test("Format table: CTAS with partitioned table") {
withTable("t1", "t2") {
- sql("CREATE TABLE t1 (id INT, p1 INT, p2 INT) USING csv PARTITIONED BY
(p1, p2)")
+ sql(
+ "CREATE TABLE t1 (id INT, p1 INT, p2 INT) USING csv PARTITIONED BY
(p1, p2) TBLPROPERTIES ('file.compression'='none')")
sql("INSERT INTO t1 VALUES (1, 2, 3)")
assertThrows[UnsupportedOperationException] {
@@ -109,7 +125,8 @@ abstract class FormatTableTestBase extends
PaimonHiveTestBase {
test("Format table: read compressed files") {
for (format <- Seq("csv", "json")) {
withTable("compress_t") {
- sql(s"CREATE TABLE compress_t (a INT, b INT, c INT) USING $format")
+ sql(
+ s"CREATE TABLE compress_t (a INT, b INT, c INT) USING $format
TBLPROPERTIES ('file.compression'='none')")
sql("INSERT INTO compress_t VALUES (1, 2, 3)")
val table =
paimonCatalog
@@ -132,7 +149,8 @@ abstract class FormatTableTestBase extends
PaimonHiveTestBase {
test("Format table: field delimiter in HMS") {
withTable("t1") {
- sql("CREATE TABLE t1 (id INT, p1 INT, p2 INT) USING csv OPTIONS
('csv.field-delimiter' ';')")
+ sql(
+ "CREATE TABLE t1 (id INT, p1 INT, p2 INT) USING csv OPTIONS
('csv.field-delimiter' ';') TBLPROPERTIES ('file.compression'='none')")
val row = sql("SHOW CREATE TABLE t1").collect()(0)
assert(row.toString().contains("'csv.field-delimiter' = ';'"))
}