This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 4ba5b2d  feat: support read to arrow (#116)
4ba5b2d is described below

commit 4ba5b2d86cac216bdceab513df96d2e55a5814c1
Author: yuxia Luo <[email protected]>
AuthorDate: Wed Mar 11 11:24:15 2026 +0800

    feat: support read to arrow (#116)
---
 .github/workflows/ci.yml                          |   8 +-
 Cargo.toml                                        |   6 +-
 Cargo.toml => crates/integration_tests/Cargo.toml |  26 ++--
 crates/integration_tests/tests/read_log_tables.rs | 104 +++++++++++++
 crates/paimon/Cargo.toml                          |   6 +-
 crates/paimon/src/{lib.rs => arrow/mod.rs}        |  14 +-
 crates/paimon/src/arrow/reader.rs                 | 170 ++++++++++++++++++++++
 crates/paimon/src/catalog/mod.rs                  |   1 +
 crates/paimon/src/error.rs                        |  18 +++
 crates/paimon/src/lib.rs                          |   7 +-
 crates/paimon/src/spec/types.rs                   |  11 +-
 crates/paimon/src/table/mod.rs                    |   6 +
 crates/paimon/src/table/read_builder.rs           |  28 +++-
 crates/paimon/src/table/source.rs                 |  12 ++
 deny.toml                                         |   1 +
 dev/spark/provision.py                            |   7 +-
 16 files changed, 390 insertions(+), 35 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 7135e15..42128d6 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -77,7 +77,7 @@ jobs:
       - uses: actions/checkout@v4
 
       - name: Test
-        run: cargo test --all-targets --workspace
+        run: cargo test -p paimon --all-targets
         env:
           RUST_LOG: DEBUG
           RUST_BACKTRACE: full
@@ -90,6 +90,12 @@ jobs:
       - name: Start Docker containers
         run: make docker-up
 
+      - name: Integration Test
+        run: cargo test -p paimon-integration-tests --all-targets
+        env:
+          RUST_LOG: DEBUG
+          RUST_BACKTRACE: full
+
       - name: Stop Docker containers
         if: always()
         run: make docker-down
diff --git a/Cargo.toml b/Cargo.toml
index f589335..0ae4fa0 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -17,7 +17,7 @@
 
 [workspace]
 resolver = "2"
-members = ["crates/paimon"]
+members = ["crates/paimon", "crates/integration_tests"]
 
 [workspace.package]
 version = "0.0.0"
@@ -26,3 +26,7 @@ homepage = "https://paimon.apache.org/";
 repository = "https://github.com/apache/paimon-rust";
 license = "Apache-2.0"
 rust-version = "1.86.0"
+
+[workspace.dependencies]
+arrow-array = "57.0"
+parquet = "57.0"
\ No newline at end of file
diff --git a/Cargo.toml b/crates/integration_tests/Cargo.toml
similarity index 59%
copy from Cargo.toml
copy to crates/integration_tests/Cargo.toml
index f589335..a4753bf 100644
--- a/Cargo.toml
+++ b/crates/integration_tests/Cargo.toml
@@ -9,20 +9,22 @@
 #   http://www.apache.org/licenses/LICENSE-2.0
 #
 # Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# software distributed under the License is distributed on
+# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
 
-[workspace]
-resolver = "2"
-members = ["crates/paimon"]
+[package]
+name = "paimon-integration-tests"
+version.workspace = true
+edition.workspace = true
+license.workspace = true
+repository.workspace = true
+homepage.workspace = true
 
-[workspace.package]
-version = "0.0.0"
-edition = "2021"
-homepage = "https://paimon.apache.org/";
-repository = "https://github.com/apache/paimon-rust";
-license = "Apache-2.0"
-rust-version = "1.86.0"
+[dependencies]
+paimon = { path = "../paimon" }
+arrow-array = { workspace = true }
+tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
+futures = "0.3"
diff --git a/crates/integration_tests/tests/read_log_tables.rs 
b/crates/integration_tests/tests/read_log_tables.rs
new file mode 100644
index 0000000..e7b42f7
--- /dev/null
+++ b/crates/integration_tests/tests/read_log_tables.rs
@@ -0,0 +1,104 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on
+// an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Integration tests for reading Paimon log tables (system tables).
+//!
+//! Paimon log tables are system tables that contain metadata about the table,
+//! such as snapshots, manifests, schemas, etc. They are stored as Parquet 
files
+//! and can be read using the Arrow reader.
+
+use arrow_array::{Int32Array, StringArray};
+use futures::TryStreamExt;
+use paimon::catalog::Identifier;
+use paimon::{Catalog, FileSystemCatalog};
+
+/// Get the test warehouse path from environment variable or use default.
+fn get_test_warehouse() -> String {
+    std::env::var("PAIMON_TEST_WAREHOUSE").unwrap_or_else(|_| 
"/tmp/paimon-warehouse".to_string())
+}
+
+/// Test reading a table and verifying the data matches expected values.
+///
+/// The table was populated with: (1, 'alice'), (2, 'bob'), (3, 'carol')
+#[tokio::test]
+async fn test_read_log_table() {
+    let warehouse = get_test_warehouse();
+    let catalog = FileSystemCatalog::new(warehouse).expect("Failed to create 
catalog");
+
+    // Get the table
+    let identifier = Identifier::new("default", "simple_log_table");
+
+    let table = catalog
+        .get_table(&identifier)
+        .await
+        .expect("Failed to get table");
+
+    // Scan the table
+    let read_builder = table.new_read_builder();
+    let read = read_builder.new_read().expect("Failed to create read");
+    let scan = read_builder.new_scan();
+
+    let plan = scan.plan().await.expect("Failed to plan scan");
+
+    // Read to Arrow
+    let stream = read
+        .to_arrow(plan.splits())
+        .expect("Failed to create arrow stream");
+
+    let batches: Vec<_> = stream
+        .try_collect()
+        .await
+        .expect("Failed to collect batches");
+
+    assert!(
+        !batches.is_empty(),
+        "Expected at least one batch from table"
+    );
+
+    // Collect all rows as (id, name) tuples
+    let mut actual_rows: Vec<(i32, String)> = Vec::new();
+
+    for batch in &batches {
+        let id_array = batch
+            .column_by_name("id")
+            .and_then(|c| c.as_any().downcast_ref::<Int32Array>())
+            .expect("Expected Int32Array for id column");
+        let name_array = batch
+            .column_by_name("name")
+            .and_then(|c| c.as_any().downcast_ref::<StringArray>())
+            .expect("Expected StringArray for name column");
+
+        for i in 0..batch.num_rows() {
+            actual_rows.push((id_array.value(i), 
name_array.value(i).to_string()));
+        }
+    }
+
+    // Expected data: (1, 'alice'), (2, 'bob'), (3, 'carol')
+    let expected_rows = vec![
+        (1, "alice".to_string()),
+        (2, "bob".to_string()),
+        (3, "carol".to_string()),
+    ];
+
+    // Sort for consistent comparison
+    actual_rows.sort_by_key(|(id, _)| *id);
+
+    assert_eq!(
+        actual_rows, expected_rows,
+        "Rows should match expected values"
+    );
+}
diff --git a/crates/paimon/Cargo.toml b/crates/paimon/Cargo.toml
index d0050de..4f6c6a6 100644
--- a/crates/paimon/Cargo.toml
+++ b/crates/paimon/Cargo.toml
@@ -49,9 +49,13 @@ snafu = "0.8.3"
 typed-builder = "^0.19"
 opendal = { version = "0.49", features = ["services-fs"] }
 pretty_assertions = "1"
-apache-avro = { version = "0.17", features = ["snappy"] }
+apache-avro = { version = "0.17", features = ["snappy", "zstandard"] }
 indexmap = "2.5.0"
 roaring = "0.10"
+arrow-array = { workspace = true }
+futures = "0.3"
+parquet = { workspace = true, features = ["async", "zstd"] }
+async-stream = "0.3.6"
 
 [dev-dependencies]
 rand = "0.8.5"
diff --git a/crates/paimon/src/lib.rs b/crates/paimon/src/arrow/mod.rs
similarity index 74%
copy from crates/paimon/src/lib.rs
copy to crates/paimon/src/arrow/mod.rs
index b3095a3..0ed18b8 100644
--- a/crates/paimon/src/lib.rs
+++ b/crates/paimon/src/arrow/mod.rs
@@ -15,16 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-mod error;
-pub use error::Error;
-pub use error::Result;
+mod reader;
 
-pub mod catalog;
-mod deletion_vector;
-pub mod file_index;
-pub mod io;
-pub mod spec;
-mod table;
-pub use table::{
-    DataSplit, DataSplitBuilder, Plan, ReadBuilder, SnapshotManager, Table, 
TableRead, TableScan,
-};
+pub use crate::arrow::reader::ArrowReaderBuilder;
diff --git a/crates/paimon/src/arrow/reader.rs 
b/crates/paimon/src/arrow/reader.rs
new file mode 100644
index 0000000..209874f
--- /dev/null
+++ b/crates/paimon/src/arrow/reader.rs
@@ -0,0 +1,170 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::io::{FileIO, FileRead, FileStatus};
+use crate::table::ArrowRecordBatchStream;
+use crate::{DataSplit, Error};
+use async_stream::try_stream;
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use futures::{StreamExt, TryFutureExt};
+use parquet::arrow::arrow_reader::ArrowReaderOptions;
+use parquet::arrow::async_reader::{AsyncFileReader, MetadataFetch};
+use parquet::arrow::ParquetRecordBatchStreamBuilder;
+use parquet::file::metadata::ParquetMetaData;
+use parquet::file::metadata::ParquetMetaDataReader;
+use std::ops::Range;
+use std::sync::Arc;
+use tokio::try_join;
+
+/// Builder to create ArrowReader
+pub struct ArrowReaderBuilder {
+    batch_size: Option<usize>,
+    file_io: FileIO,
+}
+
+impl ArrowReaderBuilder {
+    /// Create a new ArrowReaderBuilder
+    pub(crate) fn new(file_io: FileIO) -> Self {
+        ArrowReaderBuilder {
+            batch_size: None,
+            file_io,
+        }
+    }
+
+    /// Build the ArrowReader.
+    pub fn build(self) -> ArrowReader {
+        ArrowReader {
+            batch_size: self.batch_size,
+            file_io: self.file_io,
+        }
+    }
+}
+
+/// Reads data from Parquet files
+#[derive(Clone)]
+pub struct ArrowReader {
+    batch_size: Option<usize>,
+    file_io: FileIO,
+}
+
+impl ArrowReader {
+    /// Take a stream of DataSplits and read every data file in each split.
+    /// Returns a stream of Arrow RecordBatches from all files.
+    pub fn read(self, data_splits: &[DataSplit]) -> 
crate::Result<ArrowRecordBatchStream> {
+        let file_io = self.file_io.clone();
+        let batch_size = self.batch_size;
+        let paths_to_read: Vec<String> = data_splits
+            .iter()
+            .flat_map(|ds| ds.data_file_entries().map(|(p, _)| p))
+            .map(|p| {
+                if !p.to_ascii_lowercase().ends_with(".parquet") {
+                    Err(Error::Unsupported {
+                        message: format!(
+                            "unsupported file format: only .parquet is 
supported, got: {p}"
+                        ),
+                    })
+                } else {
+                    Ok(p)
+                }
+            })
+            .collect::<Result<Vec<_>, _>>()?;
+        Ok(try_stream! {
+            for path_to_read in paths_to_read {
+                let parquet_file = file_io.new_input(&path_to_read)?;
+
+                let (parquet_metadata, parquet_reader) = try_join!(
+                    parquet_file.metadata(),
+                    parquet_file.reader()
+                )?;
+
+                let arrow_file_reader = ArrowFileReader::new(parquet_metadata, 
parquet_reader);
+
+                let mut batch_stream_builder =
+                    ParquetRecordBatchStreamBuilder::new(arrow_file_reader)
+                        .await?;
+
+                if let Some(size) = batch_size {
+                    batch_stream_builder = 
batch_stream_builder.with_batch_size(size);
+                }
+
+                let mut batch_stream = batch_stream_builder.build()?;
+
+                while let Some(batch) = batch_stream.next().await {
+                    yield batch?
+                }
+            }
+        }
+        .boxed())
+    }
+}
+
+/// ArrowFileReader is a wrapper around a FileRead that impls parquets 
AsyncFileReader.
+///
+/// # TODO
+///
+/// 
[ParquetObjectReader](https://docs.rs/parquet/latest/src/parquet/arrow/async_reader/store.rs.html#64)
+/// contains the following hints to speed up metadata loading, similar to 
iceberg, we can consider adding them to this struct:
+///
+/// - `metadata_size_hint`: Provide a hint as to the size of the parquet 
file's footer.
+/// - `preload_column_index`: Load the Column Index  as part of 
[`Self::get_metadata`].
+/// - `preload_offset_index`: Load the Offset Index as part of 
[`Self::get_metadata`].
+struct ArrowFileReader<R: FileRead> {
+    meta: FileStatus,
+    r: R,
+}
+
+impl<R: FileRead> ArrowFileReader<R> {
+    /// Create a new ArrowFileReader
+    fn new(meta: FileStatus, r: R) -> Self {
+        Self { meta, r }
+    }
+
+    fn read_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, 
parquet::errors::Result<Bytes>> {
+        Box::pin(self.r.read(range.start..range.end).map_err(|err| {
+            let err_msg = format!("{err}");
+            parquet::errors::ParquetError::External(err_msg.into())
+        }))
+    }
+}
+
+impl<R: FileRead> MetadataFetch for ArrowFileReader<R> {
+    fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, 
parquet::errors::Result<Bytes>> {
+        self.read_bytes(range)
+    }
+}
+
+impl<R: FileRead> AsyncFileReader for ArrowFileReader<R> {
+    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, 
parquet::errors::Result<Bytes>> {
+        self.read_bytes(range)
+    }
+
+    fn get_metadata(
+        &mut self,
+        options: Option<&ArrowReaderOptions>,
+    ) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
+        let metadata_opts = options.map(|o| o.metadata_options().clone());
+        Box::pin(async move {
+            let file_size = self.meta.size;
+            let metadata = ParquetMetaDataReader::new()
+                .with_metadata_options(metadata_opts)
+                .load_and_finish(self, file_size)
+                .await?;
+            Ok(Arc::new(metadata))
+        })
+    }
+}
diff --git a/crates/paimon/src/catalog/mod.rs b/crates/paimon/src/catalog/mod.rs
index 086e98b..4b43ffa 100644
--- a/crates/paimon/src/catalog/mod.rs
+++ b/crates/paimon/src/catalog/mod.rs
@@ -25,6 +25,7 @@ mod filesystem;
 use std::collections::HashMap;
 use std::fmt;
 
+pub use filesystem::*;
 use serde::{Deserialize, Serialize};
 
 /// Splitter for system table names (e.g. `table$snapshots`).
diff --git a/crates/paimon/src/error.rs b/crates/paimon/src/error.rs
index 88cf5f5..6c744b4 100644
--- a/crates/paimon/src/error.rs
+++ b/crates/paimon/src/error.rs
@@ -82,6 +82,15 @@ pub enum Error {
     )]
     FileIndexFormatInvalid { message: String },
 
+    #[snafu(
+        visibility(pub(crate)),
+        display("Paimon hitting unexpected parquet error: {}", message)
+    )]
+    ParquetDataUnexpected {
+        message: String,
+        source: Box<parquet::errors::ParquetError>,
+    },
+
     // ======================= catalog errors ===============================
     #[snafu(display("Database {} already exists.", database))]
     DatabaseAlreadyExist { database: String },
@@ -119,3 +128,12 @@ impl From<apache_avro::Error> for Error {
         }
     }
 }
+
+impl From<parquet::errors::ParquetError> for Error {
+    fn from(source: parquet::errors::ParquetError) -> Self {
+        Error::ParquetDataUnexpected {
+            message: format!("Failed to read a Parquet file: {source}"),
+            source: Box::new(source),
+        }
+    }
+}
diff --git a/crates/paimon/src/lib.rs b/crates/paimon/src/lib.rs
index b3095a3..d7a8983 100644
--- a/crates/paimon/src/lib.rs
+++ b/crates/paimon/src/lib.rs
@@ -19,12 +19,17 @@ mod error;
 pub use error::Error;
 pub use error::Result;
 
+mod arrow;
 pub mod catalog;
 mod deletion_vector;
 pub mod file_index;
 pub mod io;
 pub mod spec;
-mod table;
+pub mod table;
+
+pub use catalog::Catalog;
+pub use catalog::FileSystemCatalog;
+
 pub use table::{
     DataSplit, DataSplitBuilder, Plan, ReadBuilder, SnapshotManager, Table, 
TableRead, TableScan,
 };
diff --git a/crates/paimon/src/spec/types.rs b/crates/paimon/src/spec/types.rs
index 4001638..e0ba04a 100644
--- a/crates/paimon/src/spec/types.rs
+++ b/crates/paimon/src/spec/types.rs
@@ -1237,13 +1237,22 @@ impl Default for VarCharType {
     }
 }
 
+/// Alias for variable-length string used in schema JSON (Java: 
VarCharType.STRING_TYPE).
+const STRING_TYPE_NAME: &str = "STRING";
+
 impl FromStr for VarCharType {
     type Err = Error;
 
     fn from_str(s: &str) -> Result<Self, Self::Err> {
+        let s = s.trim();
+        if s.starts_with(STRING_TYPE_NAME) {
+            let nullable = !s.contains("NOT NULL");
+            return VarCharType::with_nullable(nullable, Self::MAX_LENGTH);
+        }
         if !s.starts_with(serde_utils::VARCHAR::NAME) {
             return DataTypeInvalidSnafu {
-                message: "Invalid VARCHAR type. Expected string to start with 
'VARCHAR'.",
+                message:
+                    "Invalid VARCHAR type. Expected string to start with 
'VARCHAR' or 'STRING'.",
             }
             .fail();
         }
diff --git a/crates/paimon/src/table/mod.rs b/crates/paimon/src/table/mod.rs
index 4011f52..0e972b5 100644
--- a/crates/paimon/src/table/mod.rs
+++ b/crates/paimon/src/table/mod.rs
@@ -22,6 +22,9 @@ mod snapshot_manager;
 mod source;
 mod table_scan;
 
+use crate::Result;
+use arrow_array::RecordBatch;
+use futures::stream::BoxStream;
 pub use read_builder::{ReadBuilder, TableRead};
 pub use snapshot_manager::SnapshotManager;
 pub use source::{DataSplit, DataSplitBuilder, Plan};
@@ -84,3 +87,6 @@ impl Table {
         ReadBuilder::new(self)
     }
 }
+
+/// A stream of arrow [`RecordBatch`]es.
+pub type ArrowRecordBatchStream = BoxStream<'static, Result<RecordBatch>>;
diff --git a/crates/paimon/src/table/read_builder.rs 
b/crates/paimon/src/table/read_builder.rs
index 14f56b1..4054aa7 100644
--- a/crates/paimon/src/table/read_builder.rs
+++ b/crates/paimon/src/table/read_builder.rs
@@ -20,10 +20,11 @@
 //! Reference: 
[pypaimon.read.read_builder.ReadBuilder](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/read/read_builder.py)
 //! and 
[pypaimon.table.file_store_table.FileStoreTable.new_read_builder](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/table/file_store_table.py).
 
+use super::{ArrowRecordBatchStream, Table, TableScan};
+use crate::arrow::ArrowReaderBuilder;
 use crate::spec::DataField;
 use crate::Result;
-
-use super::{Table, TableScan};
+use crate::{DataSplit, Error};
 
 /// Builder for table scan and table read (new_scan, new_read).
 ///
@@ -72,4 +73,27 @@ impl<'a> TableRead<'a> {
     pub fn table(&self) -> &Table {
         self.table
     }
+
+    /// Returns an [`ArrowRecordBatchStream`].
+    pub fn to_arrow(&self, data_splits: &[DataSplit]) -> 
crate::Result<ArrowRecordBatchStream> {
+        // todo: consider get read batch size from table
+        if !self.table.schema.primary_keys().is_empty() {
+            return Err(Error::Unsupported {
+                message: format!(
+                    "Reading tables with primary keys is not yet supported. 
Primary keys: {:?}",
+                    self.table.schema.primary_keys()
+                ),
+            });
+        }
+        if !self.table.schema.partition_keys().is_empty() {
+            return Err(Error::Unsupported {
+                message: format!(
+                    "Reading partitioned tables is not yet supported. 
Partition keys: {:?}",
+                    self.table.schema.partition_keys()
+                ),
+            });
+        }
+        let arrow_reader_builder = 
ArrowReaderBuilder::new(self.table.file_io.clone()).build();
+        arrow_reader_builder.read(data_splits)
+    }
 }
diff --git a/crates/paimon/src/table/source.rs 
b/crates/paimon/src/table/source.rs
index 10486f9..420b662 100644
--- a/crates/paimon/src/table/source.rs
+++ b/crates/paimon/src/table/source.rs
@@ -59,6 +59,18 @@ impl DataSplit {
         &self.data_files
     }
 
+    /// Iterate over each data file in this split, yielding `(path, 
&DataFileMeta)`.
+    /// Use this to read each data file one by one (e.g. in ArrowReader).
+    pub fn data_file_entries(&self) -> impl Iterator<Item = (String, 
&DataFileMeta)> + '_ {
+        let base = self.bucket_path.trim_end_matches('/');
+        // todo: consider partition table
+        // todo: consider external path
+        self.data_files.iter().map(move |file| {
+            let path = format!("{}/{}", base, file.file_name);
+            (path, file)
+        })
+    }
+
     /// Total row count of all data files in this split.
     pub fn row_count(&self) -> i64 {
         self.data_files.iter().map(|f| f.row_count).sum()
diff --git a/deny.toml b/deny.toml
index 943c3f8..c11cec6 100644
--- a/deny.toml
+++ b/deny.toml
@@ -22,6 +22,7 @@ allow = [
     "BSD-2-Clause",
     "BSD-3-Clause",
     "ISC",
+    "CC0-1.0",
     "MIT",
     "Unicode-3.0",
     "Zlib",
diff --git a/dev/spark/provision.py b/dev/spark/provision.py
index 00e3cca..435a5b2 100644
--- a/dev/spark/provision.py
+++ b/dev/spark/provision.py
@@ -28,15 +28,14 @@ def main():
     # Use Paimon catalog (configured in spark-defaults.conf with warehouse 
file:/tmp/paimon-warehouse)
     spark.sql("USE paimon.default")
 
-    # Table: simple keyed table for read tests
+    # Table: simple log table for read tests
     spark.sql("""
-        CREATE TABLE IF NOT EXISTS simple (
+        CREATE TABLE IF NOT EXISTS simple_log_table (
             id INT,
             name STRING
         ) USING paimon
-        TBLPROPERTIES ('primary-key' = 'id')
     """)
-    spark.sql("INSERT INTO simple VALUES (1, 'alice'), (2, 'bob'), (3, 
'carol')")
+    spark.sql("INSERT INTO simple_log_table VALUES (1, 'alice'), (2, 'bob'), 
(3, 'carol')")
 
 if __name__ == "__main__":
     main()

Reply via email to