This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 4ba5b2d feat: support read to arrow (#116)
4ba5b2d is described below
commit 4ba5b2d86cac216bdceab513df96d2e55a5814c1
Author: yuxia Luo <[email protected]>
AuthorDate: Wed Mar 11 11:24:15 2026 +0800
feat: support read to arrow (#116)
---
.github/workflows/ci.yml | 8 +-
Cargo.toml | 6 +-
Cargo.toml => crates/integration_tests/Cargo.toml | 26 ++--
crates/integration_tests/tests/read_log_tables.rs | 104 +++++++++++++
crates/paimon/Cargo.toml | 6 +-
crates/paimon/src/{lib.rs => arrow/mod.rs} | 14 +-
crates/paimon/src/arrow/reader.rs | 170 ++++++++++++++++++++++
crates/paimon/src/catalog/mod.rs | 1 +
crates/paimon/src/error.rs | 18 +++
crates/paimon/src/lib.rs | 7 +-
crates/paimon/src/spec/types.rs | 11 +-
crates/paimon/src/table/mod.rs | 6 +
crates/paimon/src/table/read_builder.rs | 28 +++-
crates/paimon/src/table/source.rs | 12 ++
deny.toml | 1 +
dev/spark/provision.py | 7 +-
16 files changed, 390 insertions(+), 35 deletions(-)
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 7135e15..42128d6 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -77,7 +77,7 @@ jobs:
- uses: actions/checkout@v4
- name: Test
- run: cargo test --all-targets --workspace
+ run: cargo test -p paimon --all-targets
env:
RUST_LOG: DEBUG
RUST_BACKTRACE: full
@@ -90,6 +90,12 @@ jobs:
- name: Start Docker containers
run: make docker-up
+ - name: Integration Test
+ run: cargo test -p paimon-integration-tests --all-targets
+ env:
+ RUST_LOG: DEBUG
+ RUST_BACKTRACE: full
+
- name: Stop Docker containers
if: always()
run: make docker-down
diff --git a/Cargo.toml b/Cargo.toml
index f589335..0ae4fa0 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -17,7 +17,7 @@
[workspace]
resolver = "2"
-members = ["crates/paimon"]
+members = ["crates/paimon", "crates/integration_tests"]
[workspace.package]
version = "0.0.0"
@@ -26,3 +26,7 @@ homepage = "https://paimon.apache.org/"
repository = "https://github.com/apache/paimon-rust"
license = "Apache-2.0"
rust-version = "1.86.0"
+
+[workspace.dependencies]
+arrow-array = "57.0"
+parquet = "57.0"
\ No newline at end of file
diff --git a/Cargo.toml b/crates/integration_tests/Cargo.toml
similarity index 59%
copy from Cargo.toml
copy to crates/integration_tests/Cargo.toml
index f589335..a4753bf 100644
--- a/Cargo.toml
+++ b/crates/integration_tests/Cargo.toml
@@ -9,20 +9,22 @@
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# software distributed under the License is distributed on
+# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-[workspace]
-resolver = "2"
-members = ["crates/paimon"]
+[package]
+name = "paimon-integration-tests"
+version.workspace = true
+edition.workspace = true
+license.workspace = true
+repository.workspace = true
+homepage.workspace = true
-[workspace.package]
-version = "0.0.0"
-edition = "2021"
-homepage = "https://paimon.apache.org/"
-repository = "https://github.com/apache/paimon-rust"
-license = "Apache-2.0"
-rust-version = "1.86.0"
+[dependencies]
+paimon = { path = "../paimon" }
+arrow-array = { workspace = true }
+tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
+futures = "0.3"
diff --git a/crates/integration_tests/tests/read_log_tables.rs
b/crates/integration_tests/tests/read_log_tables.rs
new file mode 100644
index 0000000..e7b42f7
--- /dev/null
+++ b/crates/integration_tests/tests/read_log_tables.rs
@@ -0,0 +1,104 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on
+// an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Integration tests for reading Paimon log tables (system tables).
+//!
+//! Paimon log tables are system tables that contain metadata about the table,
+//! such as snapshots, manifests, schemas, etc. They are stored as Parquet
files
+//! and can be read using the Arrow reader.
+
+use arrow_array::{Int32Array, StringArray};
+use futures::TryStreamExt;
+use paimon::catalog::Identifier;
+use paimon::{Catalog, FileSystemCatalog};
+
+/// Get the test warehouse path from environment variable or use default.
+fn get_test_warehouse() -> String {
+ std::env::var("PAIMON_TEST_WAREHOUSE").unwrap_or_else(|_|
"/tmp/paimon-warehouse".to_string())
+}
+
+/// Test reading a table and verifying the data matches expected values.
+///
+/// The table was populated with: (1, 'alice'), (2, 'bob'), (3, 'carol')
+#[tokio::test]
+async fn test_read_log_table() {
+ let warehouse = get_test_warehouse();
+ let catalog = FileSystemCatalog::new(warehouse).expect("Failed to create
catalog");
+
+ // Get the table
+ let identifier = Identifier::new("default", "simple_log_table");
+
+ let table = catalog
+ .get_table(&identifier)
+ .await
+ .expect("Failed to get table");
+
+ // Scan the table
+ let read_builder = table.new_read_builder();
+ let read = read_builder.new_read().expect("Failed to create read");
+ let scan = read_builder.new_scan();
+
+ let plan = scan.plan().await.expect("Failed to plan scan");
+
+ // Read to Arrow
+ let stream = read
+ .to_arrow(plan.splits())
+ .expect("Failed to create arrow stream");
+
+ let batches: Vec<_> = stream
+ .try_collect()
+ .await
+ .expect("Failed to collect batches");
+
+ assert!(
+ !batches.is_empty(),
+ "Expected at least one batch from table"
+ );
+
+ // Collect all rows as (id, name) tuples
+ let mut actual_rows: Vec<(i32, String)> = Vec::new();
+
+ for batch in &batches {
+ let id_array = batch
+ .column_by_name("id")
+ .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
+ .expect("Expected Int32Array for id column");
+ let name_array = batch
+ .column_by_name("name")
+ .and_then(|c| c.as_any().downcast_ref::<StringArray>())
+ .expect("Expected StringArray for name column");
+
+ for i in 0..batch.num_rows() {
+ actual_rows.push((id_array.value(i),
name_array.value(i).to_string()));
+ }
+ }
+
+ // Expected data: (1, 'alice'), (2, 'bob'), (3, 'carol')
+ let expected_rows = vec![
+ (1, "alice".to_string()),
+ (2, "bob".to_string()),
+ (3, "carol".to_string()),
+ ];
+
+ // Sort for consistent comparison
+ actual_rows.sort_by_key(|(id, _)| *id);
+
+ assert_eq!(
+ actual_rows, expected_rows,
+ "Rows should match expected values"
+ );
+}
diff --git a/crates/paimon/Cargo.toml b/crates/paimon/Cargo.toml
index d0050de..4f6c6a6 100644
--- a/crates/paimon/Cargo.toml
+++ b/crates/paimon/Cargo.toml
@@ -49,9 +49,13 @@ snafu = "0.8.3"
typed-builder = "^0.19"
opendal = { version = "0.49", features = ["services-fs"] }
pretty_assertions = "1"
-apache-avro = { version = "0.17", features = ["snappy"] }
+apache-avro = { version = "0.17", features = ["snappy", "zstandard"] }
indexmap = "2.5.0"
roaring = "0.10"
+arrow-array = { workspace = true }
+futures = "0.3"
+parquet = { workspace = true, features = ["async", "zstd"] }
+async-stream = "0.3.6"
[dev-dependencies]
rand = "0.8.5"
diff --git a/crates/paimon/src/lib.rs b/crates/paimon/src/arrow/mod.rs
similarity index 74%
copy from crates/paimon/src/lib.rs
copy to crates/paimon/src/arrow/mod.rs
index b3095a3..0ed18b8 100644
--- a/crates/paimon/src/lib.rs
+++ b/crates/paimon/src/arrow/mod.rs
@@ -15,16 +15,6 @@
// specific language governing permissions and limitations
// under the License.
-mod error;
-pub use error::Error;
-pub use error::Result;
+mod reader;
-pub mod catalog;
-mod deletion_vector;
-pub mod file_index;
-pub mod io;
-pub mod spec;
-mod table;
-pub use table::{
- DataSplit, DataSplitBuilder, Plan, ReadBuilder, SnapshotManager, Table,
TableRead, TableScan,
-};
+pub use crate::arrow::reader::ArrowReaderBuilder;
diff --git a/crates/paimon/src/arrow/reader.rs
b/crates/paimon/src/arrow/reader.rs
new file mode 100644
index 0000000..209874f
--- /dev/null
+++ b/crates/paimon/src/arrow/reader.rs
@@ -0,0 +1,170 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::io::{FileIO, FileRead, FileStatus};
+use crate::table::ArrowRecordBatchStream;
+use crate::{DataSplit, Error};
+use async_stream::try_stream;
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use futures::{StreamExt, TryFutureExt};
+use parquet::arrow::arrow_reader::ArrowReaderOptions;
+use parquet::arrow::async_reader::{AsyncFileReader, MetadataFetch};
+use parquet::arrow::ParquetRecordBatchStreamBuilder;
+use parquet::file::metadata::ParquetMetaData;
+use parquet::file::metadata::ParquetMetaDataReader;
+use std::ops::Range;
+use std::sync::Arc;
+use tokio::try_join;
+
+/// Builder to create ArrowReader
+pub struct ArrowReaderBuilder {
+ batch_size: Option<usize>,
+ file_io: FileIO,
+}
+
+impl ArrowReaderBuilder {
+ /// Create a new ArrowReaderBuilder
+ pub(crate) fn new(file_io: FileIO) -> Self {
+ ArrowReaderBuilder {
+ batch_size: None,
+ file_io,
+ }
+ }
+
+ /// Build the ArrowReader.
+ pub fn build(self) -> ArrowReader {
+ ArrowReader {
+ batch_size: self.batch_size,
+ file_io: self.file_io,
+ }
+ }
+}
+
+/// Reads data from Parquet files
+#[derive(Clone)]
+pub struct ArrowReader {
+ batch_size: Option<usize>,
+ file_io: FileIO,
+}
+
+impl ArrowReader {
+ /// Take a stream of DataSplits and read every data file in each split.
+ /// Returns a stream of Arrow RecordBatches from all files.
+ pub fn read(self, data_splits: &[DataSplit]) ->
crate::Result<ArrowRecordBatchStream> {
+ let file_io = self.file_io.clone();
+ let batch_size = self.batch_size;
+ let paths_to_read: Vec<String> = data_splits
+ .iter()
+ .flat_map(|ds| ds.data_file_entries().map(|(p, _)| p))
+ .map(|p| {
+ if !p.to_ascii_lowercase().ends_with(".parquet") {
+ Err(Error::Unsupported {
+ message: format!(
+ "unsupported file format: only .parquet is
supported, got: {p}"
+ ),
+ })
+ } else {
+ Ok(p)
+ }
+ })
+ .collect::<Result<Vec<_>, _>>()?;
+ Ok(try_stream! {
+ for path_to_read in paths_to_read {
+ let parquet_file = file_io.new_input(&path_to_read)?;
+
+ let (parquet_metadata, parquet_reader) = try_join!(
+ parquet_file.metadata(),
+ parquet_file.reader()
+ )?;
+
+ let arrow_file_reader = ArrowFileReader::new(parquet_metadata,
parquet_reader);
+
+ let mut batch_stream_builder =
+ ParquetRecordBatchStreamBuilder::new(arrow_file_reader)
+ .await?;
+
+ if let Some(size) = batch_size {
+ batch_stream_builder =
batch_stream_builder.with_batch_size(size);
+ }
+
+ let mut batch_stream = batch_stream_builder.build()?;
+
+ while let Some(batch) = batch_stream.next().await {
+ yield batch?
+ }
+ }
+ }
+ .boxed())
+ }
+}
+
+/// ArrowFileReader is a wrapper around a FileRead that impls parquets
AsyncFileReader.
+///
+/// # TODO
+///
+///
[ParquetObjectReader](https://docs.rs/parquet/latest/src/parquet/arrow/async_reader/store.rs.html#64)
+/// contains the following hints to speed up metadata loading, similar to
iceberg, we can consider adding them to this struct:
+///
+/// - `metadata_size_hint`: Provide a hint as to the size of the parquet
file's footer.
+/// - `preload_column_index`: Load the Column Index as part of
[`Self::get_metadata`].
+/// - `preload_offset_index`: Load the Offset Index as part of
[`Self::get_metadata`].
+struct ArrowFileReader<R: FileRead> {
+ meta: FileStatus,
+ r: R,
+}
+
+impl<R: FileRead> ArrowFileReader<R> {
+ /// Create a new ArrowFileReader
+ fn new(meta: FileStatus, r: R) -> Self {
+ Self { meta, r }
+ }
+
+ fn read_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_,
parquet::errors::Result<Bytes>> {
+ Box::pin(self.r.read(range.start..range.end).map_err(|err| {
+ let err_msg = format!("{err}");
+ parquet::errors::ParquetError::External(err_msg.into())
+ }))
+ }
+}
+
+impl<R: FileRead> MetadataFetch for ArrowFileReader<R> {
+ fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_,
parquet::errors::Result<Bytes>> {
+ self.read_bytes(range)
+ }
+}
+
+impl<R: FileRead> AsyncFileReader for ArrowFileReader<R> {
+ fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_,
parquet::errors::Result<Bytes>> {
+ self.read_bytes(range)
+ }
+
+ fn get_metadata(
+ &mut self,
+ options: Option<&ArrowReaderOptions>,
+ ) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
+ let metadata_opts = options.map(|o| o.metadata_options().clone());
+ Box::pin(async move {
+ let file_size = self.meta.size;
+ let metadata = ParquetMetaDataReader::new()
+ .with_metadata_options(metadata_opts)
+ .load_and_finish(self, file_size)
+ .await?;
+ Ok(Arc::new(metadata))
+ })
+ }
+}
diff --git a/crates/paimon/src/catalog/mod.rs b/crates/paimon/src/catalog/mod.rs
index 086e98b..4b43ffa 100644
--- a/crates/paimon/src/catalog/mod.rs
+++ b/crates/paimon/src/catalog/mod.rs
@@ -25,6 +25,7 @@ mod filesystem;
use std::collections::HashMap;
use std::fmt;
+pub use filesystem::*;
use serde::{Deserialize, Serialize};
/// Splitter for system table names (e.g. `table$snapshots`).
diff --git a/crates/paimon/src/error.rs b/crates/paimon/src/error.rs
index 88cf5f5..6c744b4 100644
--- a/crates/paimon/src/error.rs
+++ b/crates/paimon/src/error.rs
@@ -82,6 +82,15 @@ pub enum Error {
)]
FileIndexFormatInvalid { message: String },
+ #[snafu(
+ visibility(pub(crate)),
+ display("Paimon hitting unexpected parquet error: {}", message)
+ )]
+ ParquetDataUnexpected {
+ message: String,
+ source: Box<parquet::errors::ParquetError>,
+ },
+
// ======================= catalog errors ===============================
#[snafu(display("Database {} already exists.", database))]
DatabaseAlreadyExist { database: String },
@@ -119,3 +128,12 @@ impl From<apache_avro::Error> for Error {
}
}
}
+
+impl From<parquet::errors::ParquetError> for Error {
+ fn from(source: parquet::errors::ParquetError) -> Self {
+ Error::ParquetDataUnexpected {
+ message: format!("Failed to read a Parquet file: {source}"),
+ source: Box::new(source),
+ }
+ }
+}
diff --git a/crates/paimon/src/lib.rs b/crates/paimon/src/lib.rs
index b3095a3..d7a8983 100644
--- a/crates/paimon/src/lib.rs
+++ b/crates/paimon/src/lib.rs
@@ -19,12 +19,17 @@ mod error;
pub use error::Error;
pub use error::Result;
+mod arrow;
pub mod catalog;
mod deletion_vector;
pub mod file_index;
pub mod io;
pub mod spec;
-mod table;
+pub mod table;
+
+pub use catalog::Catalog;
+pub use catalog::FileSystemCatalog;
+
pub use table::{
DataSplit, DataSplitBuilder, Plan, ReadBuilder, SnapshotManager, Table,
TableRead, TableScan,
};
diff --git a/crates/paimon/src/spec/types.rs b/crates/paimon/src/spec/types.rs
index 4001638..e0ba04a 100644
--- a/crates/paimon/src/spec/types.rs
+++ b/crates/paimon/src/spec/types.rs
@@ -1237,13 +1237,22 @@ impl Default for VarCharType {
}
}
+/// Alias for variable-length string used in schema JSON (Java:
VarCharType.STRING_TYPE).
+const STRING_TYPE_NAME: &str = "STRING";
+
impl FromStr for VarCharType {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
+ let s = s.trim();
+ if s.starts_with(STRING_TYPE_NAME) {
+ let nullable = !s.contains("NOT NULL");
+ return VarCharType::with_nullable(nullable, Self::MAX_LENGTH);
+ }
if !s.starts_with(serde_utils::VARCHAR::NAME) {
return DataTypeInvalidSnafu {
- message: "Invalid VARCHAR type. Expected string to start with
'VARCHAR'.",
+ message:
+ "Invalid VARCHAR type. Expected string to start with
'VARCHAR' or 'STRING'.",
}
.fail();
}
diff --git a/crates/paimon/src/table/mod.rs b/crates/paimon/src/table/mod.rs
index 4011f52..0e972b5 100644
--- a/crates/paimon/src/table/mod.rs
+++ b/crates/paimon/src/table/mod.rs
@@ -22,6 +22,9 @@ mod snapshot_manager;
mod source;
mod table_scan;
+use crate::Result;
+use arrow_array::RecordBatch;
+use futures::stream::BoxStream;
pub use read_builder::{ReadBuilder, TableRead};
pub use snapshot_manager::SnapshotManager;
pub use source::{DataSplit, DataSplitBuilder, Plan};
@@ -84,3 +87,6 @@ impl Table {
ReadBuilder::new(self)
}
}
+
+/// A stream of arrow [`RecordBatch`]es.
+pub type ArrowRecordBatchStream = BoxStream<'static, Result<RecordBatch>>;
diff --git a/crates/paimon/src/table/read_builder.rs
b/crates/paimon/src/table/read_builder.rs
index 14f56b1..4054aa7 100644
--- a/crates/paimon/src/table/read_builder.rs
+++ b/crates/paimon/src/table/read_builder.rs
@@ -20,10 +20,11 @@
//! Reference:
[pypaimon.read.read_builder.ReadBuilder](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/read/read_builder.py)
//! and
[pypaimon.table.file_store_table.FileStoreTable.new_read_builder](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/table/file_store_table.py).
+use super::{ArrowRecordBatchStream, Table, TableScan};
+use crate::arrow::ArrowReaderBuilder;
use crate::spec::DataField;
use crate::Result;
-
-use super::{Table, TableScan};
+use crate::{DataSplit, Error};
/// Builder for table scan and table read (new_scan, new_read).
///
@@ -72,4 +73,27 @@ impl<'a> TableRead<'a> {
pub fn table(&self) -> &Table {
self.table
}
+
+ /// Returns an [`ArrowRecordBatchStream`].
+ pub fn to_arrow(&self, data_splits: &[DataSplit]) ->
crate::Result<ArrowRecordBatchStream> {
+ // todo: consider get read batch size from table
+ if !self.table.schema.primary_keys().is_empty() {
+ return Err(Error::Unsupported {
+ message: format!(
+ "Reading tables with primary keys is not yet supported.
Primary keys: {:?}",
+ self.table.schema.primary_keys()
+ ),
+ });
+ }
+ if !self.table.schema.partition_keys().is_empty() {
+ return Err(Error::Unsupported {
+ message: format!(
+ "Reading partitioned tables is not yet supported.
Partition keys: {:?}",
+ self.table.schema.partition_keys()
+ ),
+ });
+ }
+ let arrow_reader_builder =
ArrowReaderBuilder::new(self.table.file_io.clone()).build();
+ arrow_reader_builder.read(data_splits)
+ }
}
diff --git a/crates/paimon/src/table/source.rs
b/crates/paimon/src/table/source.rs
index 10486f9..420b662 100644
--- a/crates/paimon/src/table/source.rs
+++ b/crates/paimon/src/table/source.rs
@@ -59,6 +59,18 @@ impl DataSplit {
&self.data_files
}
+ /// Iterate over each data file in this split, yielding `(path,
&DataFileMeta)`.
+ /// Use this to read each data file one by one (e.g. in ArrowReader).
+ pub fn data_file_entries(&self) -> impl Iterator<Item = (String,
&DataFileMeta)> + '_ {
+ let base = self.bucket_path.trim_end_matches('/');
+ // todo: consider partition table
+ // todo: consider external path
+ self.data_files.iter().map(move |file| {
+ let path = format!("{}/{}", base, file.file_name);
+ (path, file)
+ })
+ }
+
/// Total row count of all data files in this split.
pub fn row_count(&self) -> i64 {
self.data_files.iter().map(|f| f.row_count).sum()
diff --git a/deny.toml b/deny.toml
index 943c3f8..c11cec6 100644
--- a/deny.toml
+++ b/deny.toml
@@ -22,6 +22,7 @@ allow = [
"BSD-2-Clause",
"BSD-3-Clause",
"ISC",
+ "CC0-1.0",
"MIT",
"Unicode-3.0",
"Zlib",
diff --git a/dev/spark/provision.py b/dev/spark/provision.py
index 00e3cca..435a5b2 100644
--- a/dev/spark/provision.py
+++ b/dev/spark/provision.py
@@ -28,15 +28,14 @@ def main():
# Use Paimon catalog (configured in spark-defaults.conf with warehouse
file:/tmp/paimon-warehouse)
spark.sql("USE paimon.default")
- # Table: simple keyed table for read tests
+ # Table: simple log table for read tests
spark.sql("""
- CREATE TABLE IF NOT EXISTS simple (
+ CREATE TABLE IF NOT EXISTS simple_log_table (
id INT,
name STRING
) USING paimon
- TBLPROPERTIES ('primary-key' = 'id')
""")
- spark.sql("INSERT INTO simple VALUES (1, 'alice'), (2, 'bob'), (3,
'carol')")
+ spark.sql("INSERT INTO simple_log_table VALUES (1, 'alice'), (2, 'bob'),
(3, 'carol')")
if __name__ == "__main__":
main()