This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new e02f035848 [core] spark: support format table write (#6365)
e02f035848 is described below
commit e02f035848a862cd8456ca48c029b824c5de1ec8
Author: jerry <[email protected]>
AuthorDate: Thu Oct 16 20:34:14 2025 +0800
[core] spark: support format table write (#6365)
---
.../fs/MultiPartUploadTwoPhaseOutputStream.java | 2 +
.../paimon/fs/RenamingTwoPhaseOutputStream.java | 2 +
.../org/apache/paimon/fs/TwoPhaseOutputStream.java | 3 +-
.../org/apache/paimon/catalog/CatalogTestBase.java | 38 ++++--
.../format/orc/writer/RowDataVectorizer.java | 2 +-
.../paimon/format/text/HadoopCompressionUtils.java | 7 +
.../spark/sql/execution/PaimonFormatTable.scala | 152 ++++++++++++++++++++-
.../paimon/spark/table/PaimonFormatTableTest.scala | 65 +++++----
8 files changed, 225 insertions(+), 46 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStream.java
b/paimon-common/src/main/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStream.java
index 362635b4df..2bfd0bec72 100644
---
a/paimon-common/src/main/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStream.java
+++
b/paimon-common/src/main/java/org/apache/paimon/fs/MultiPartUploadTwoPhaseOutputStream.java
@@ -158,6 +158,8 @@ public abstract class
MultiPartUploadTwoPhaseOutputStream<T, C> extends TwoPhase
private static class MultiPartUploadCommitter<T, C> implements Committer {
+ private static final long serialVersionUID = 1L;
+
private final MultiPartUploadStore<T, C> multiPartUploadStore;
private final String uploadId;
private final String objectName;
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fs/RenamingTwoPhaseOutputStream.java
b/paimon-common/src/main/java/org/apache/paimon/fs/RenamingTwoPhaseOutputStream.java
index 0da827d920..48a7c4f912 100644
---
a/paimon-common/src/main/java/org/apache/paimon/fs/RenamingTwoPhaseOutputStream.java
+++
b/paimon-common/src/main/java/org/apache/paimon/fs/RenamingTwoPhaseOutputStream.java
@@ -96,6 +96,8 @@ public class RenamingTwoPhaseOutputStream extends
TwoPhaseOutputStream {
/** Committer implementation that renames temporary file to target path. */
private static class TempFileCommitter implements Committer {
+ private static final long serialVersionUID = 1L;
+
private final FileIO fileIO;
private final Path tempPath;
private final Path targetPath;
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fs/TwoPhaseOutputStream.java
b/paimon-common/src/main/java/org/apache/paimon/fs/TwoPhaseOutputStream.java
index 401486e91b..3b8c18a70e 100644
--- a/paimon-common/src/main/java/org/apache/paimon/fs/TwoPhaseOutputStream.java
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/TwoPhaseOutputStream.java
@@ -19,6 +19,7 @@
package org.apache.paimon.fs;
import java.io.IOException;
+import java.io.Serializable;
/** TwoPhaseOutputStream provides a way to write to a file and get a committer
that can commit. */
public abstract class TwoPhaseOutputStream extends PositionOutputStream {
@@ -35,7 +36,7 @@ public abstract class TwoPhaseOutputStream extends
PositionOutputStream {
public abstract Committer closeForCommit() throws IOException;
/** A committer interface that can commit or discard the written data. */
- public interface Committer {
+ public interface Committer extends Serializable {
/**
* Commits the written data, making it visible.
diff --git
a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
index a515e96ae0..ea3e2ebd86 100644
--- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
@@ -583,7 +583,6 @@ public abstract class CatalogTestBase {
Random random = new Random();
String dbName = "test_db";
catalog.createDatabase(dbName, true);
- HadoopCompressionType compressionType = HadoopCompressionType.GZIP;
Schema.Builder schemaBuilder = Schema.newBuilder();
schemaBuilder.column("f1", DataTypes.INT());
schemaBuilder.column("f2", DataTypes.INT());
@@ -591,14 +590,20 @@ public abstract class CatalogTestBase {
schemaBuilder.column("dt2", DataTypes.VARCHAR(64));
schemaBuilder.partitionKeys("dt", "dt2");
schemaBuilder.option("type", "format-table");
- schemaBuilder.option("file.compression", compressionType.value());
schemaBuilder.option("format-table.partition-path-only-value", "true");
- String[] formats = {"csv", "parquet", "json"};
+ Pair[] format2Compressions = {
+ Pair.of("csv", HadoopCompressionType.GZIP),
+ Pair.of("parquet", HadoopCompressionType.ZSTD),
+ Pair.of("json", HadoopCompressionType.GZIP),
+ Pair.of("orc", HadoopCompressionType.ZSTD)
+ };
int dtPartitionValue = 10;
String dt2PartitionValue = "2022-01-01";
- for (String format : formats) {
- Identifier identifier = Identifier.create(dbName,
"partition_table_" + format);
- schemaBuilder.option("file.format", format);
+ for (Pair<String, HadoopCompressionType> format2Compression :
format2Compressions) {
+ Identifier identifier =
+ Identifier.create(dbName, "partition_table_" +
format2Compression.getKey());
+ schemaBuilder.option("file.compression",
format2Compression.getValue().value());
+ schemaBuilder.option("file.format", format2Compression.getKey());
catalog.createTable(identifier, schemaBuilder.build(), true);
FormatTable table = (FormatTable) catalog.getTable(identifier);
int size = 5;
@@ -619,7 +624,7 @@ public abstract class CatalogTestBase {
partitionSpec.put("dt2", dt2PartitionValue + 1);
List<InternalRow> readFilterData = read(table, null, null,
partitionSpec, null);
assertThat(readFilterData).isEmpty();
- catalog.dropTable(Identifier.create(dbName, format), true);
+ catalog.dropTable(identifier, true);
}
}
@@ -633,21 +638,26 @@ public abstract class CatalogTestBase {
String dbName = "test_db";
catalog.createDatabase(dbName, true);
int partitionValue = 10;
- HadoopCompressionType compressionType = HadoopCompressionType.GZIP;
Schema.Builder schemaBuilder = Schema.newBuilder();
schemaBuilder.column("f1", DataTypes.INT());
schemaBuilder.column("f2", DataTypes.INT());
schemaBuilder.column("dt", DataTypes.INT());
schemaBuilder.option("type", "format-table");
schemaBuilder.option("target-file-size", "1 kb");
- schemaBuilder.option("file.compression", compressionType.value());
- String[] formats = {"csv", "parquet", "json"};
- for (String format : formats) {
+ Pair[] format2Compressions = {
+ Pair.of("csv", HadoopCompressionType.GZIP),
+ Pair.of("parquet", HadoopCompressionType.ZSTD),
+ Pair.of("json", HadoopCompressionType.GZIP),
+ Pair.of("orc", HadoopCompressionType.ZSTD)
+ };
+ for (Pair<String, HadoopCompressionType> format2Compression :
format2Compressions) {
if (partitioned) {
schemaBuilder.partitionKeys("dt");
}
- Identifier identifier = Identifier.create(dbName, "table_" +
format);
- schemaBuilder.option("file.format", format);
+ Identifier identifier =
+ Identifier.create(dbName, "table_" +
format2Compression.getKey());
+ schemaBuilder.option("file.format", format2Compression.getKey());
+ schemaBuilder.option("file.compression",
format2Compression.getValue().value());
catalog.createTable(identifier, schemaBuilder.build(), true);
FormatTable table = (FormatTable) catalog.getTable(identifier);
int[] projection = new int[] {1, 2};
@@ -696,7 +706,7 @@ public abstract class CatalogTestBase {
read(table, partitionFilterPredicate, projection,
null, null);
assertThat(readPartitionAndNoPartitionFilterData).hasSize(size);
}
- catalog.dropTable(Identifier.create(dbName, format), true);
+ catalog.dropTable(identifier, true);
}
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/RowDataVectorizer.java
b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/RowDataVectorizer.java
index 5cb37f191a..e36fbdfee9 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/RowDataVectorizer.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/RowDataVectorizer.java
@@ -53,7 +53,7 @@ public class RowDataVectorizer extends
Vectorizer<InternalRow> {
@Override
public void vectorize(InternalRow row, VectorizedRowBatch batch) {
int rowId = batch.size++;
- for (int i = 0; i < row.getFieldCount(); ++i) {
+ for (int i = 0; i < fieldNames.length; ++i) {
ColumnVector fieldColumn = batch.cols[i];
if (row.isNullAt(i)) {
if (!isNullable[i]) {
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/text/HadoopCompressionUtils.java
b/paimon-format/src/main/java/org/apache/paimon/format/text/HadoopCompressionUtils.java
index 7ff2f97e17..0a292c4d8b 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/text/HadoopCompressionUtils.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/text/HadoopCompressionUtils.java
@@ -23,6 +23,7 @@ import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
@@ -101,6 +102,12 @@ public class HadoopCompressionUtils {
Class<?> codecClass = Class.forName(codecName);
CompressionCodec codec =
(CompressionCodec)
codecClass.getDeclaredConstructor().newInstance();
+
+ // To fix npe when the codec implements Configurable
+ if (codec instanceof Configurable) {
+ ((Configurable) codec).setConf(new Configuration());
+ }
+
codec.createOutputStream(new java.io.ByteArrayOutputStream());
return Optional.of(codec);
} catch (Exception | UnsatisfiedLinkError e) {
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonFormatTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonFormatTable.scala
index 5167c4c1bd..aa444aba92 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonFormatTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonFormatTable.scala
@@ -18,17 +18,22 @@
package org.apache.spark.sql.execution
-import org.apache.paimon.spark.{PaimonFormatTableScanBuilder, SparkTypeUtils}
+import org.apache.paimon.fs.TwoPhaseOutputStream
+import org.apache.paimon.spark.{PaimonFormatTableScanBuilder,
SparkInternalRowWrapper, SparkTypeUtils}
import org.apache.paimon.table.FormatTable
+import org.apache.paimon.table.format.{FormatBatchWriteBuilder,
TwoPhaseCommitMessage}
+import org.apache.paimon.table.sink.BatchTableWrite
import org.apache.hadoop.fs.Path
+import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo,
Literal}
import org.apache.spark.sql.connector.catalog.{SupportsPartitionManagement,
SupportsRead, SupportsWrite, TableCapability}
-import org.apache.spark.sql.connector.catalog.TableCapability.BATCH_READ
+import org.apache.spark.sql.connector.catalog.TableCapability.{BATCH_READ,
BATCH_WRITE}
import org.apache.spark.sql.connector.read.ScanBuilder
-import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
+import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter,
DataWriterFactory, LogicalWriteInfo, PhysicalWriteInfo, Write, WriteBuilder,
WriterCommitMessage}
+import org.apache.spark.sql.connector.write.streaming.StreamingWrite
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.v2.csv.{CSVScanBuilder,
CSVTable}
import org.apache.spark.sql.execution.datasources.v2.json.JsonTable
@@ -186,7 +191,7 @@ case class PaimonFormatTable(
}
override def capabilities(): util.Set[TableCapability] = {
- util.EnumSet.of(BATCH_READ)
+ util.EnumSet.of(BATCH_READ, BATCH_WRITE)
}
override def newScanBuilder(caseInsensitiveStringMap:
CaseInsensitiveStringMap): ScanBuilder = {
@@ -194,7 +199,7 @@ case class PaimonFormatTable(
}
override def newWriteBuilder(logicalWriteInfo: LogicalWriteInfo):
WriteBuilder = {
- throw new UnsupportedOperationException()
+ PaimonFormatTableWriterBuilder(table, schema)
}
}
@@ -297,3 +302,140 @@ class PartitionedJsonTable(
partitionSchema())
}
}
+
+case class PaimonFormatTableWriterBuilder(table: FormatTable, writeSchema:
StructType)
+ extends WriteBuilder {
+ override def build: Write = new Write() {
+ override def toBatch: BatchWrite = {
+ FormatTableBatchWrite(table, writeSchema)
+ }
+
+ override def toStreaming: StreamingWrite = {
+ throw new UnsupportedOperationException("FormatTable doesn't support
streaming write")
+ }
+ }
+}
+
+private case class FormatTableBatchWrite(table: FormatTable, writeSchema:
StructType)
+ extends BatchWrite
+ with Logging {
+
+ override def createBatchWriterFactory(info: PhysicalWriteInfo):
DataWriterFactory =
+ FormatTableWriterFactory(table, writeSchema)
+
+ override def useCommitCoordinator(): Boolean = false
+
+ override def commit(messages: Array[WriterCommitMessage]): Unit = {
+ logInfo(s"Committing to FormatTable ${table.name()}")
+
+ val committers = messages
+ .collect {
+ case taskCommit: FormatTableTaskCommit => taskCommit.committers()
+ case other =>
+ throw new IllegalArgumentException(s"${other.getClass.getName} is
not supported")
+ }
+ .flatten
+ .toSeq
+
+ try {
+ val start = System.currentTimeMillis()
+ committers.foreach(_.commit())
+ logInfo(s"Committed in ${System.currentTimeMillis() - start} ms")
+ } catch {
+ case e: Exception =>
+ logError("Failed to commit FormatTable writes", e)
+ throw e
+ }
+ }
+
+ override def abort(messages: Array[WriterCommitMessage]): Unit = {
+ logInfo(s"Aborting write to FormatTable ${table.name()}")
+ val committers = messages.collect {
+ case taskCommit: FormatTableTaskCommit => taskCommit.committers()
+ }.flatten
+
+ committers.foreach {
+ committer =>
+ try {
+ committer.discard()
+ } catch {
+ case e: Exception => logWarning(s"Failed to abort committer:
${e.getMessage}")
+ }
+ }
+ }
+}
+
+private case class FormatTableWriterFactory(table: FormatTable, writeSchema:
StructType)
+ extends DataWriterFactory {
+
+ override def createWriter(partitionId: Int, taskId: Long):
DataWriter[InternalRow] = {
+ val formatTableWrite = table.newBatchWriteBuilder().newWrite()
+ new FormatTableDataWriter(table, formatTableWrite, writeSchema)
+ }
+}
+
+private class FormatTableDataWriter(
+ table: FormatTable,
+ formatTableWrite: BatchTableWrite,
+ writeSchema: StructType)
+ extends DataWriter[InternalRow]
+ with Logging {
+
+ private val rowConverter: InternalRow => org.apache.paimon.data.InternalRow
= {
+ val numFields = writeSchema.fields.length
+ record => {
+ new SparkInternalRowWrapper(-1, writeSchema, numFields).replace(record)
+ }
+ }
+
+ override def write(record: InternalRow): Unit = {
+ val paimonRow = rowConverter.apply(record)
+ formatTableWrite.write(paimonRow)
+ }
+
+ override def commit(): WriterCommitMessage = {
+ try {
+ val committers = formatTableWrite
+ .prepareCommit()
+ .asScala
+ .map {
+ case committer: TwoPhaseCommitMessage => committer.getCommitter
+ case other =>
+ throw new IllegalArgumentException(
+ "Unsupported commit message type: " +
other.getClass.getSimpleName)
+ }
+ .toSeq
+ FormatTableTaskCommit(committers)
+ } finally {
+ close()
+ }
+ }
+
+ override def abort(): Unit = {
+ logInfo("Aborting FormatTable data writer")
+ close()
+ }
+
+ override def close(): Unit = {
+ try {
+ formatTableWrite.close()
+ } catch {
+ case e: Exception =>
+ logError("Error closing FormatTableDataWriter", e)
+ throw new RuntimeException(e)
+ }
+ }
+}
+
+/** Commit message container for FormatTable writes, holding committers that
need to be executed. */
+class FormatTableTaskCommit private (private val _committers:
Seq[TwoPhaseOutputStream.Committer])
+ extends WriterCommitMessage {
+
+ def committers(): Seq[TwoPhaseOutputStream.Committer] = _committers
+}
+
+object FormatTableTaskCommit {
+ def apply(committers: Seq[TwoPhaseOutputStream.Committer]):
FormatTableTaskCommit = {
+ new FormatTableTaskCommit(committers)
+ }
+}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
index c01898a7c8..a78813bc0f 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
@@ -18,7 +18,6 @@
package org.apache.paimon.spark.table
-import org.apache.paimon.CoreOptions
import org.apache.paimon.catalog.Identifier
import org.apache.paimon.fs.Path
import org.apache.paimon.spark.PaimonSparkTestWithRestCatalogBase
@@ -39,10 +38,9 @@ class PaimonFormatTableTest extends
PaimonSparkTestWithRestCatalogBase {
val tableName = "paimon_format_test_csv_malformed"
withTable(tableName) {
sql(
- s"CREATE TABLE $tableName (f0 INT, f1 string, f2 INT) USING CSV
OPTIONS ('" +
- s"file.compression'='none', 'seq'='|', 'lineSep'='\n', " +
- s"'${CoreOptions.FORMAT_TABLE_IMPLEMENTATION
-
.key()}'='${CoreOptions.FormatTableImplementation.PAIMON.toString}')
PARTITIONED BY (`ds` bigint)")
+ s"CREATE TABLE $tableName (f0 INT, f1 string, f2 INT) USING CSV
TBLPROPERTIES (" +
+ s"'file.compression'='none', 'seq'='|', 'lineSep'='\n', " +
+ "'format-table.implementation'='paimon') PARTITIONED BY (`ds`
bigint)")
val table =
paimonCatalog.getTable(Identifier.create("test_db",
tableName)).asInstanceOf[FormatTable]
val partition = 20250920
@@ -67,18 +65,15 @@ class PaimonFormatTableTest extends
PaimonSparkTestWithRestCatalogBase {
val tableName = "paimon_format_test_csv_options"
withTable(tableName) {
sql(
- s"CREATE TABLE $tableName (f0 INT, f1 string) USING CSV OPTIONS ('" +
- s"file.compression'='none', 'seq'='|', 'lineSep'='\n', " +
- s"'${CoreOptions.FORMAT_TABLE_IMPLEMENTATION
-
.key()}'='${CoreOptions.FormatTableImplementation.PAIMON.toString}')
PARTITIONED BY (`ds` bigint)")
+ s"CREATE TABLE $tableName (f0 INT, f1 string) USING CSV TBLPROPERTIES
(" +
+ s"'file.compression'='none', 'seq'='|', 'lineSep'='\n', " +
+ "'format-table.implementation'='paimon') PARTITIONED BY (`ds`
bigint)")
val table =
paimonCatalog.getTable(Identifier.create("test_db",
tableName)).asInstanceOf[FormatTable]
+ table.fileIO().mkdirs(new Path(table.location()))
val partition = 20250920
- val csvFile =
- new Path(
- table.location(),
-
s"ds=$partition/part-00000-0a28422e-68ba-4713-8870-2fde2d36ed06-c001.csv")
- table.fileIO().writeFile(csvFile, "1|asfasdfsdf\n2|asfasdfsdf", false)
+ sql(
+ s"INSERT INTO $tableName VALUES (1, 'asfasdfsdf', $partition), (2,
'asfasdfsdf', $partition)")
checkAnswer(
sql(s"SELECT * FROM $tableName"),
Seq(Row(1, "asfasdfsdf", partition), Row(2, "asfasdfsdf", partition))
@@ -86,7 +81,31 @@ class PaimonFormatTableTest extends
PaimonSparkTestWithRestCatalogBase {
}
}
- test("PaimonFormatTableRead: read non-partitioned table") {
+ test("PaimonFormatTable: csv with partition path only value") {
+ val tableName = "paimon_format_test_partition_path_only_value"
+ withTable(tableName) {
+ sql(
+ s"CREATE TABLE $tableName (f0 INT, f1 string) USING CSV TBLPROPERTIES
(" +
+ s"'file.compression'='none','format-table.implementation'='paimon',"
+
+ "'format-table.partition-path-only-value'='true') PARTITIONED BY
(`ds` bigint)")
+ val table =
+ paimonCatalog.getTable(Identifier.create("test_db",
tableName)).asInstanceOf[FormatTable]
+ table.fileIO().mkdirs(new Path(table.location()))
+ val partition = 20250920
+ sql(
+ s"INSERT INTO $tableName VALUES (1, 'asfasdfsdf', $partition), (2,
'asfasdfsdf', $partition)")
+ checkAnswer(
+ sql(s"SELECT ds, f0 FROM $tableName"),
+ Seq(Row(partition, 1), Row(partition, 2))
+ )
+ checkAnswer(
+ sql(s"SELECT ds, f0 FROM $tableName where ds = $partition order by f0
limit 1"),
+ Seq(Row(partition, 1))
+ )
+ }
+ }
+
+ test("PaimonFormatTable: non-partitioned table") {
for {
(format, compression) <- Seq(
("csv", "gzip"),
@@ -99,7 +118,8 @@ class PaimonFormatTableTest extends
PaimonSparkTestWithRestCatalogBase {
// Create format table using the same pattern as FormatTableTestBase
sql(
s"CREATE TABLE $tableName (id INT, name STRING, value DOUBLE) USING
$format " +
- s"TBLPROPERTIES ('file.compression'='$compression', 'seq'=',',
'lineSep'='\n')")
+ s"TBLPROPERTIES ('file.compression'='$compression', 'seq'=',',
'lineSep'='\n'," +
+ " 'format-table.implementation'='paimon')")
val path =
paimonCatalog.getTable(Identifier.create("test_db",
tableName)).options().get("path")
fileIO.mkdirs(new Path(path))
@@ -109,9 +129,6 @@ class PaimonFormatTableTest extends
PaimonSparkTestWithRestCatalogBase {
sql(s"INSERT INTO $tableName VALUES (3, 'Charlie', 30.9)")
// Test reading all data
- sql(
- s"Alter table $tableName SET TBLPROPERTIES
('${CoreOptions.FORMAT_TABLE_IMPLEMENTATION.key()}'" +
- s"='${CoreOptions.FormatTableImplementation.PAIMON.toString}')")
checkAnswer(
sql(s"SELECT * FROM $tableName ORDER BY id"),
Seq(
@@ -143,7 +160,7 @@ class PaimonFormatTableTest extends
PaimonSparkTestWithRestCatalogBase {
}
}
- test("PaimonFormatTableRead: read partitioned table") {
+ test("PaimonFormatTable: partitioned table") {
for {
(format, compression) <- Seq(
("csv", "gzip"),
@@ -155,8 +172,9 @@ class PaimonFormatTableTest extends
PaimonSparkTestWithRestCatalogBase {
withTable(tableName) {
// Create partitioned format table
sql(
- s"CREATE TABLE $tableName (id INT, name STRING, value DOUBLE, dept
STRING) USING $format " +
- s"PARTITIONED BY (dept) TBLPROPERTIES
('file.compression'='$compression')")
+ s"CREATE TABLE $tableName (id INT, name STRING, value DOUBLE) USING
$format " +
+ s"PARTITIONED BY (dept STRING) TBLPROPERTIES
('file.compression'='$compression'," +
+ " 'format-table.implementation'='paimon')")
val paimonTable = paimonCatalog.getTable(Identifier.create("test_db",
tableName))
val path =
paimonCatalog.getTable(Identifier.create("test_db",
tableName)).options().get("path")
@@ -170,9 +188,6 @@ class PaimonFormatTableTest extends
PaimonSparkTestWithRestCatalogBase {
s" (5, 'Eve', 15.8, 'Marketing')")
// Test reading all data
- sql(
- s"Alter table $tableName SET TBLPROPERTIES
('${CoreOptions.FORMAT_TABLE_IMPLEMENTATION.key()}'" +
- s"='${CoreOptions.FormatTableImplementation.PAIMON.toString}')")
checkAnswer(
sql(s"SELECT * FROM $tableName ORDER BY id"),
Seq(