This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new cd475b224b1 [opt](paimon) Optimize Paimon Time Travel Implementation
and Fix Schema Consistency Issues (#56023)
cd475b224b1 is described below
commit cd475b224b1e39278aa8bd9f1608d5173f3126ba
Author: Petrichor <[email protected]>
AuthorDate: Tue Sep 23 12:44:08 2025 +0800
[opt](paimon) Optimize Paimon Time Travel Implementation and Fix Schema
Consistency Issues (#56023)
### What problem does this PR solve?
Following PR #55404 which addressed incorrect schema results for Paimon
tables, this PR further optimizes the Paimon time travel implementation
by removing redundant code and fixing the critical issue where schema ID
and snapshot ID were fetched separately, causing consistency problems
and unnecessary I/O overhead.
#### Key Problem Solved
The core challenge in Paimon time travel is efficiently constructing
tables with specified snapshot ID and schema ID. Previously, these were
fetched separately, causing unnecessary I/O overhead and potential
consistency issues.
#### Major Improvements
1. Performance Optimization
- Unified schema and snapshot retrieval: Combined separate API calls for
schema ID and snapshot ID into a single operation
- Removed redundant metadata fetches: Eliminated duplicate calls to
Paimon metadata store
- Optimized branch schema handling: Streamlined branch-specific schema
resolution
2. Bug Fixes
- Fixed timezone-related query errors: Resolved incorrect results when
using Paimon time travel syntax with non-UTC timezones
- Fixed branch schema consistency: Ensured snapshot ID and schema ID are
always fetched atomically to prevent mismatched metadata
3. Enhanced Timestamp Format Support for FOR TIME AS OF
Now supports the following timestamp formats:
- YYYY-MM-DD HH:MM:SS.SSS - Full timestamp with milliseconds (e.g.,
2024-01-15 10:30:45.123)
- YYYY-MM-DD HH:MM:SS - Timestamp with seconds precision (e.g.,
2024-01-15 10:30:45)
- YYYY-MM-DD - Date only format (defaults to 00:00:00.000) (e.g.,
2024-01-15)
Example usage:
```
-- Using different timestamp formats
SELECT * FROM paimon_table FOR TIME AS OF "2024-01-15 10:30:45.123";
SELECT * FROM paimon_table FOR TIME AS OF "2024-01-15 10:30:45";
SELECT * FROM paimon_table FOR TIME AS OF "2024-01-15";
```
---
.../create_preinstalled_scripts/paimon/run09.sql | 24 ++-
.../datasource/paimon/PaimonExternalTable.java | 27 ++-
.../apache/doris/datasource/paimon/PaimonUtil.java | 221 ++-------------------
.../datasource/paimon/source/PaimonScanNode.java | 32 +--
.../paimon/paimon_time_travel.out | 98 +++++++++
.../paimon/paimon_time_travel.groovy | 119 +++++++++--
6 files changed, 268 insertions(+), 253 deletions(-)
diff --git
a/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/paimon/run09.sql
b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/paimon/run09.sql
index 058bbfd7e19..87b290e1a55 100644
---
a/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/paimon/run09.sql
+++
b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/paimon/run09.sql
@@ -85,4 +85,26 @@ INSERT INTO
test_paimon_time_travel_db.`tbl_time_travel$branch_b_2` VALUES
(20007, 8007, '2024-02-07', 378.45, '465 Lavender Avenue, Thorndale, WY
82201', 'PROCESSING', false),
(20008, 8008, '2024-02-08', 92.30, '729 Iris Lane, Riverside, MN 55987',
'SHIPPED', true),
(20009, 8009, '2024-02-09', 445.80, '856 Tulip Boulevard, Sunnydale, ND
58301', 'PENDING', false),
-(20010, 8010, '2024-02-10', 167.25, '392 Daisy Court, Meadowbrook, SD 57401',
'CANCELLED', true);
\ No newline at end of file
+(20010, 8010, '2024-02-10', 167.25, '392 Daisy Court, Meadowbrook, SD 57401',
'CANCELLED', true);
+
+
+-- time travle schema change
+ALTER TABLE test_paimon_time_travel_db.tbl_time_travel ADD COLUMNS (
+ new_col1 INT
+);
+
+-- - snpashot 5
+INSERT INTO test_paimon_time_travel_db.tbl_time_travel VALUES
+(6001, 9001, '2024-02-11', 456.80, '123 New Street, Downtown, CA 90210',
'COMPLETED', true, 100),
+(6002, 9002, '2024-02-12', 289.45, '456 Updated Ave, Midtown, NY 10001',
'PROCESSING', false, 200),
+(6003, 9003, '2024-02-13', 378.90, '789 Modern Blvd, Uptown, TX 75201',
'SHIPPED', true, 300);
+
+CALL sys.create_tag(table => 'test_paimon_time_travel_db.tbl_time_travel', tag
=> 't_5', snapshot => 5);
+
+-- - snapshot 6
+INSERT INTO test_paimon_time_travel_db.tbl_time_travel VALUES
+(6004, 9004, '2024-02-14', 199.99, '321 Future Lane, Innovation, WA 98001',
'PENDING', true, 400),
+(6005, 9005, '2024-02-15', 567.25, '654 Progress Drive, Tech City, OR 97201',
'COMPLETED', false, 500),
+(6006, 9006, '2024-02-16', 123.75, '987 Advanced Court, Silicon Valley, CA
94301', 'CANCELLED', true, 600);
+
+CALL sys.create_tag(table => 'test_paimon_time_travel_db.tbl_time_travel', tag
=> 't_6', snapshot => 6);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java
index 013aab4f479..3fba39810a2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java
@@ -102,20 +102,45 @@ public class PaimonExternalTable extends ExternalTable
implements MTMVRelatedTab
private PaimonSnapshotCacheValue
getPaimonSnapshotCacheValue(Optional<TableSnapshot> tableSnapshot,
Optional<TableScanParams> scanParams) {
makeSureInitialized();
+
+ // Current limitation: cannot specify both table snapshot and scan
parameters simultaneously.
if (tableSnapshot.isPresent() || (scanParams.isPresent() &&
scanParams.get().isTag())) {
// If a snapshot is specified,
// use the specified snapshot and the corresponding schema(not the
latest
// schema).
try {
Snapshot snapshot = PaimonUtil.getPaimonSnapshot(paimonTable,
tableSnapshot, scanParams);
+ Table dataTable = paimonTable.copy(
+
Collections.singletonMap(CoreOptions.SCAN_SNAPSHOT_ID.key(),
String.valueOf(snapshot.id())));
return new PaimonSnapshotCacheValue(PaimonPartitionInfo.EMPTY,
- new PaimonSnapshot(snapshot.id(), snapshot.schemaId(),
paimonTable));
+ new PaimonSnapshot(snapshot.id(), snapshot.schemaId(),
dataTable));
} catch (Exception e) {
LOG.warn("Failed to get Paimon snapshot for table {}",
paimonTable.name(), e);
throw new RuntimeException(
"Failed to get Paimon snapshot: " + (e.getMessage() ==
null ? "unknown cause" : e.getMessage()),
e);
}
+ } else if (scanParams.isPresent() && scanParams.get().isBranch()) {
+ try {
+ String branch =
PaimonUtil.resolvePaimonBranch(scanParams.get(), paimonTable);
+ Table table = ((PaimonExternalCatalog)
catalog).getPaimonTable(getOrBuildNameMapping(), branch, null);
+ Optional<Snapshot> latestSnapshot = table.latestSnapshot();
+ long latestSnapshotId = PaimonSnapshot.INVALID_SNAPSHOT_ID;
+ if (latestSnapshot.isPresent()) {
+ latestSnapshotId = latestSnapshot.get().id();
+ }
+ // Branches in Paimon can have independent schemas and
snapshots.
+ // TODO: Add time travel support for paimon branch tables.
+ DataTable dataTable = (DataTable) table;
+ Long schemaId =
dataTable.schemaManager().latest().map(TableSchema::id).orElse(0L);
+ return new PaimonSnapshotCacheValue(PaimonPartitionInfo.EMPTY,
+ new PaimonSnapshot(latestSnapshotId, schemaId,
dataTable));
+ } catch (Exception e) {
+ LOG.warn("Failed to get Paimon branch for table {}",
paimonTable.name(), e);
+ throw new RuntimeException(
+ "Failed to get Paimon branch: " + (e.getMessage() ==
null ? "unknown cause" : e.getMessage()),
+ e);
+ }
} else {
// Otherwise, use the latest snapshot and the latest schema.
return
Env.getCurrentEnv().getExtMetaCacheMgr().getPaimonMetadataCache()
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java
index 35883cd7316..16a9c5c5b43 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java
@@ -29,9 +29,7 @@ import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.TimeUtils;
-import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.hive.HiveUtil;
-import org.apache.doris.datasource.paimon.source.PaimonSource;
import org.apache.doris.thrift.TColumnType;
import org.apache.doris.thrift.TPrimitiveType;
import org.apache.doris.thrift.schema.external.TArrayField;
@@ -48,8 +46,6 @@ import com.google.common.collect.Maps;
import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import org.apache.paimon.CoreOptions;
-import org.apache.paimon.CoreOptions.StartupMode;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
@@ -88,7 +84,6 @@ import java.time.LocalTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
@@ -103,22 +98,6 @@ public class PaimonUtil {
private static final Base64.Encoder BASE64_ENCODER =
java.util.Base64.getUrlEncoder().withoutPadding();
private static final Pattern DIGITAL_REGEX = Pattern.compile("\\d+");
- private static final List<ConfigOption<?>>
PAIMON_FROM_TIMESTAMP_CONFLICT_OPTIONS = Arrays.asList(
- CoreOptions.SCAN_SNAPSHOT_ID,
- CoreOptions.SCAN_TAG_NAME,
- CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS,
- CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP,
- CoreOptions.INCREMENTAL_BETWEEN,
- CoreOptions.INCREMENTAL_TO_AUTO_TAG);
-
- private static final List<ConfigOption<?>>
PAIMON_FROM_SNAPSHOT_CONFLICT_OPTIONS = Arrays.asList(
- CoreOptions.SCAN_TIMESTAMP_MILLIS,
- CoreOptions.SCAN_TIMESTAMP,
- CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS,
- CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP,
- CoreOptions.INCREMENTAL_BETWEEN,
- CoreOptions.INCREMENTAL_TO_AUTO_TAG);
-
public static List<InternalRow> read(
Table table, @Nullable int[] projection, @Nullable Predicate
predicate,
Pair<ConfigOption<?>, String>... dynamicOptions)
@@ -509,100 +488,6 @@ public class PaimonUtil {
}
}
- /**
- * Builds a snapshot-specific table for time travel queries.
- *
- * @param baseTable the base Paimon table to copy configuration from
- * @param tableSnapshot the snapshot specification (type + value)
- * @return a Table instance configured for the specified time travel query
- * @throws UserException if snapshot configuration is invalid
- */
- public static Table getTableBySnapshot(Table baseTable, TableSnapshot
tableSnapshot)
- throws UserException {
- final String value = tableSnapshot.getValue();
- final TableSnapshot.VersionType type = tableSnapshot.getType();
- final boolean isDigital = DIGITAL_REGEX.matcher(value).matches();
-
- switch (type) {
- case TIME:
- return isDigital
- ? getTableBySnapshotTimestampMillis(baseTable, value)
- : getTableBySnapshotTime(baseTable, value);
-
- case VERSION:
- if (isDigital) {
- return getTableBySnapshotId(baseTable, value);
- }
- return getTableByTag(baseTable, value);
-
- default:
- throw new UserException(String.format("Unsupported version
type: %s", type));
- }
- }
-
- /**
- * Builds a table configured to read from a specific snapshot ID.
- *
- * @param baseTable the base Paimon table to copy configuration from
- * @param snapshotId the snapshot ID as a string
- * @return a Table instance configured to read from the specified snapshot
ID
- */
- private static Table getTableBySnapshotId(Table baseTable, String
snapshotId) {
- Map<String, String> options = new HashMap<>(
- PAIMON_FROM_SNAPSHOT_CONFLICT_OPTIONS.size() + 3);
-
- // For Paimon FROM_SNAPSHOT startup mode, must set only one key in:
- // [scan_tag_name, scan_watermark, scan_snapshot_id]
- options.put(CoreOptions.SCAN_TAG_NAME.key(), null);
- options.put(CoreOptions.SCAN_WATERMARK.key(), null);
- options.put(CoreOptions.SCAN_SNAPSHOT_ID.key(), snapshotId);
-
options.putAll(excludePaimonConflictOptions(PAIMON_FROM_SNAPSHOT_CONFLICT_OPTIONS));
-
- return baseTable.copy(options);
- }
-
- /**
- * Builds a table configured to read from a specific timestamp.
- *
- * @param baseTable the base Paimon table to copy configuration from
- * @param timestampStr the timestamp as a string
- * @return a Table instance configured to read from the specified timestamp
- */
- private static Table getTableBySnapshotTime(Table baseTable, String
timestampStr) {
- Map<String, String> options = new HashMap<>(
- PAIMON_FROM_TIMESTAMP_CONFLICT_OPTIONS.size() + 3);
-
- // For Paimon FROM_TIMESTAMP startup mode, must set only one key in:
- // [scan_timestamp, scan_timestamp_millis]
- options.put(CoreOptions.SCAN_MODE.key(),
StartupMode.FROM_TIMESTAMP.toString());
- options.put(CoreOptions.SCAN_TIMESTAMP.key(), timestampStr);
- options.put(CoreOptions.SCAN_TIMESTAMP_MILLIS.key(), null);
-
options.putAll(excludePaimonConflictOptions(PAIMON_FROM_TIMESTAMP_CONFLICT_OPTIONS));
-
- return baseTable.copy(options);
- }
-
- /**
- * Builds a table configured to read from a specific timestamp in
milliseconds.
- *
- * @param baseTable the base Paimon table to copy configuration from
- * @param timestampStr the timestamp in milliseconds as a string
- * @return a Table instance configured to read from the specified timestamp
- */
- private static Table getTableBySnapshotTimestampMillis(Table baseTable,
String timestampStr) {
- Map<String, String> options = new HashMap<>(
- PAIMON_FROM_TIMESTAMP_CONFLICT_OPTIONS.size() + 3);
-
- // For Paimon FROM_TIMESTAMP startup mode, must set only one key in:
- // [scan_timestamp, scan_timestamp_millis]
- options.put(CoreOptions.SCAN_MODE.key(),
StartupMode.FROM_TIMESTAMP.toString());
- options.put(CoreOptions.SCAN_TIMESTAMP.key(), null);
- options.put(CoreOptions.SCAN_TIMESTAMP_MILLIS.key(), timestampStr);
-
options.putAll(excludePaimonConflictOptions(PAIMON_FROM_TIMESTAMP_CONFLICT_OPTIONS));
-
- return baseTable.copy(options);
- }
-
/**
* Extracts the reference name (branch or tag name) from table scan
parameters.
*
@@ -624,53 +509,6 @@ public class PaimonUtil {
}
}
-
- /**
- * Builds a branch-specific table for time travel queries.
- *
- * @param source the Paimon source containing catalog and table information
- * @param baseTable the base Paimon table
- * @param branchName the branch name
- * @return a Table instance configured to read from the specified branch
- * @throws UserException if branch does not exist
- */
- public static Table getTableByBranch(PaimonSource source, Table baseTable,
String branchName) throws UserException {
-
- if (!checkBranchExists(baseTable, branchName)) {
- throw new UserException(String.format("Branch '%s' does not
exist", branchName));
- }
-
- PaimonExternalCatalog catalog = (PaimonExternalCatalog)
source.getCatalog();
- ExternalTable externalTable = (ExternalTable) source.getTargetTable();
- return catalog.getPaimonTable(externalTable.getOrBuildNameMapping(),
branchName, null);
- }
-
- /**
- * Builds a tag-specific table for time travel queries.
- *
- * @param baseTable the base Paimon table to copy configuration from
- * @param tagName the tag name
- * @return a Table instance configured to read from the specified tag
- * @throws UserException if tag does not exist
- */
- public static Table getTableByTag(Table baseTable, String tagName) throws
UserException {
- if (!checkTagsExists(baseTable, tagName)) {
- throw new UserException(String.format("Tag '%s' does not exist",
tagName));
- }
-
- Map<String, String> options = new HashMap<>(
- PAIMON_FROM_SNAPSHOT_CONFLICT_OPTIONS.size() + 3);
-
- // For Paimon FROM_SNAPSHOT startup mode, must set only one key in:
- // [scan_tag_name, scan_watermark, scan_snapshot_id]
- options.put(CoreOptions.SCAN_TAG_NAME.key(), tagName);
- options.put(CoreOptions.SCAN_WATERMARK.key(), null);
- options.put(CoreOptions.SCAN_SNAPSHOT_ID.key(), null);
-
options.putAll(excludePaimonConflictOptions(PAIMON_FROM_SNAPSHOT_CONFLICT_OPTIONS));
-
- return baseTable.copy(options);
- }
-
// get snapshot info from query like 'for version/time as of' or '@tag'
public static Snapshot getPaimonSnapshot(Table table,
Optional<TableSnapshot> querySnapshot,
Optional<TableScanParams> scanParams) throws UserException {
@@ -711,14 +549,24 @@ public class PaimonUtil {
if (isDigital) {
timestampMillis = Long.parseLong(timestamp);
} else {
- timestampMillis = TimeUtils.msTimeStringToLong(timestamp,
TimeUtils.getTimeZone());
+ // Supported formats include:yyyy-MM-dd, yyyy-MM-dd HH:mm:ss,
yyyy-MM-dd HH:mm:ss.SSS.
+ // use default local time zone.
+ timestampMillis = DateTimeUtils.parseTimestampData(timestamp, 3,
TimeUtils.getTimeZone()).getMillisecond();
if (timestampMillis < 0) {
throw new DateTimeException("can't parse time: " + timestamp);
}
}
Snapshot snapshot =
table.snapshotManager().earlierOrEqualTimeMills(timestampMillis);
if (snapshot == null) {
- throw new UserException("can't find snapshot older than : " +
timestamp);
+ Snapshot earliestSnapshot =
table.snapshotManager().earliestSnapshot();
+ throw new UserException(
+ String.format(
+ "There is currently no snapshot earlier than or
equal to timestamp [%s], "
+ + "the earliest snapshot's timestamp is
[%s]",
+ timestampMillis,
+ earliestSnapshot == null
+ ? "null"
+ :
String.valueOf(earliestSnapshot.timeMillis())));
}
return snapshot;
}
@@ -740,50 +588,17 @@ public class PaimonUtil {
return tag.orElseThrow(() -> new UserException("can't find snapshot by
tag: " + tagName));
}
- /**
- * Creates a map of conflicting Paimon options with null values for
exclusion.
- *
- * @param illegalOptions the list of ConfigOptions that should be set to
null
- * @return a HashMap containing the illegal options as keys with null
values
- */
- public static Map<String, String>
excludePaimonConflictOptions(List<ConfigOption<?>> illegalOptions) {
- return illegalOptions.stream()
- .collect(HashMap::new,
- (m, option) -> m.put(option.key(), null),
- HashMap::putAll);
- }
-
- /**
- * Checks if a tag exists in the given table.
- *
- * @param baseTable the Paimon table
- * @param tagName the tag name to check
- * @return true if tag exists, false otherwise
- * @throws UserException if table is not a FileStoreTable
- */
- public static boolean checkTagsExists(Table baseTable, String tagName)
throws UserException {
+ public static String resolvePaimonBranch(TableScanParams tableScanParams,
Table baseTable)
+ throws UserException {
+ String branchName = extractBranchOrTagName(tableScanParams);
if (!(baseTable instanceof FileStoreTable)) {
throw new UserException("Table type should be FileStoreTable but
got: " + baseTable.getClass().getName());
}
final FileStoreTable fileStoreTable = (FileStoreTable) baseTable;
- return fileStoreTable.tagManager().tagExists(tagName);
- }
-
- /**
- * Checks if a branch exists in the given table.
- *
- * @param baseTable the Paimon table
- * @param branchName the branch name to check
- * @return true if branch exists, false otherwise
- * @throws UserException if table is not a FileStoreTable
- */
- public static boolean checkBranchExists(Table baseTable, String
branchName) throws UserException {
- if (!(baseTable instanceof FileStoreTable)) {
- throw new UserException("Table type should be FileStoreTable but
got: " + baseTable.getClass().getName());
+ if (!fileStoreTable.branchManager().branchExists(branchName)) {
+ throw new UserException("can't find branch: " + branchName);
}
-
- final FileStoreTable fileStoreTable = (FileStoreTable) baseTable;
- return fileStoreTable.branchManager().branchExists(branchName);
+ return branchName;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
index 7d49643e50d..97bc7cbcf03 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
@@ -18,7 +18,6 @@
package org.apache.doris.datasource.paimon.source;
import org.apache.doris.analysis.TableScanParams;
-import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
@@ -681,39 +680,16 @@ public class PaimonScanNode extends FileQueryScanNode {
return paimonScanParams;
}
- /**
- * Processes and returns the appropriate Paimon table object based on scan
parameters or table snapshot.
- * <p>
- * This method handles different scan modes including incremental reads
and system tables,
- * applying the necessary transformations to the base Paimon table.
- *
- * @return processed Paimon table object configured according to scan
parameters
- * @throws UserException when system table configuration is incorrect
- */
private Table getProcessedTable() throws UserException {
Table baseTable = source.getPaimonTable();
- if (getScanParams() != null && getQueryTableSnapshot() != null) {
- throw new UserException("Can not specify scan params and table
snapshot at same time.");
- }
TableScanParams theScanParams = getScanParams();
- if (theScanParams != null) {
- if (theScanParams.incrementalRead()) {
- return baseTable.copy(getIncrReadParams());
- }
-
- if (theScanParams.isBranch()) {
- return PaimonUtil.getTableByBranch(source, baseTable,
PaimonUtil.extractBranchOrTagName(theScanParams));
- }
- if (theScanParams.isTag()) {
- return PaimonUtil.getTableByTag(baseTable,
PaimonUtil.extractBranchOrTagName(theScanParams));
- }
+ if (theScanParams != null && getQueryTableSnapshot() != null) {
+ throw new UserException("Can not specify scan params and table
snapshot at same time.");
}
- TableSnapshot theTableSnapshot = getQueryTableSnapshot();
- if (theTableSnapshot != null) {
- return PaimonUtil.getTableBySnapshot(baseTable, theTableSnapshot);
+ if (theScanParams != null && theScanParams.incrementalRead()) {
+ return baseTable.copy(getIncrReadParams());
}
-
return baseTable;
}
}
diff --git
a/regression-test/data/external_table_p0/paimon/paimon_time_travel.out
b/regression-test/data/external_table_p0/paimon/paimon_time_travel.out
index 3527cc4604e..93a7c42e165 100644
--- a/regression-test/data/external_table_p0/paimon/paimon_time_travel.out
+++ b/regression-test/data/external_table_p0/paimon/paimon_time_travel.out
@@ -1001,3 +1001,101 @@ true 7
false 5
true 7
+-- !schema_change_snapshot_5_version_count --
+15
+
+-- !schema_change_snapshot_5_version --
+1001 2001 2024-01-15 299.99 123 Maple Street, Springfield, IL 62701
COMPLETED true \N
+1002 2002 2024-01-16 156.50 456 Oak Avenue, Riverside, CA 92507
PROCESSING false \N
+1003 2003 2024-01-17 89.00 789 Pine Boulevard, Greenfield, TX
75001 SHIPPED true \N
+2001 3001 2024-01-18 445.75 321 Cedar Lane, Millbrook, NY 12545
PENDING false \N
+2002 3002 2024-01-19 67.25 654 Birch Drive, Lakewood, CO 80226
COMPLETED true \N
+2003 3003 2024-01-20 188.90 987 Elm Court, Fairview, OR 97024
CANCELLED false \N
+3001 4001 2024-01-21 325.40 159 Willow Street, Brookdale, FL 33602
SHIPPED true \N
+3002 4002 2024-01-22 99.85 753 Aspen Road, Clearwater, WA 98012
PROCESSING true \N
+3003 4003 2024-01-23 512.30 264 Chestnut Avenue, Westfield, MI
48097 COMPLETED false \N
+5001 6001 2024-01-24 278.60 842 Hickory Lane, Stonewood, GA 30309
PENDING true \N
+5002 6002 2024-01-25 134.75 417 Poplar Street, Ridgefield, NV 89109
SHIPPED false \N
+5003 6003 2024-01-26 389.20 695 Sycamore Drive, Maplewood, AZ 85001
COMPLETED true \N
+6001 9001 2024-02-11 456.80 123 New Street, Downtown, CA 90210
COMPLETED true 100
+6002 9002 2024-02-12 289.45 456 Updated Ave, Midtown, NY 10001
PROCESSING false 200
+6003 9003 2024-02-13 378.90 789 Modern Blvd, Uptown, TX 75201
SHIPPED true 300
+
+-- !schema_change_snapshot_6_version_count --
+18
+
+-- !schema_change_snapshot_6_version --
+1001 2001 2024-01-15 299.99 123 Maple Street, Springfield, IL 62701
COMPLETED true \N
+1002 2002 2024-01-16 156.50 456 Oak Avenue, Riverside, CA 92507
PROCESSING false \N
+1003 2003 2024-01-17 89.00 789 Pine Boulevard, Greenfield, TX
75001 SHIPPED true \N
+2001 3001 2024-01-18 445.75 321 Cedar Lane, Millbrook, NY 12545
PENDING false \N
+2002 3002 2024-01-19 67.25 654 Birch Drive, Lakewood, CO 80226
COMPLETED true \N
+2003 3003 2024-01-20 188.90 987 Elm Court, Fairview, OR 97024
CANCELLED false \N
+3001 4001 2024-01-21 325.40 159 Willow Street, Brookdale, FL 33602
SHIPPED true \N
+3002 4002 2024-01-22 99.85 753 Aspen Road, Clearwater, WA 98012
PROCESSING true \N
+3003 4003 2024-01-23 512.30 264 Chestnut Avenue, Westfield, MI
48097 COMPLETED false \N
+5001 6001 2024-01-24 278.60 842 Hickory Lane, Stonewood, GA 30309
PENDING true \N
+5002 6002 2024-01-25 134.75 417 Poplar Street, Ridgefield, NV 89109
SHIPPED false \N
+5003 6003 2024-01-26 389.20 695 Sycamore Drive, Maplewood, AZ 85001
COMPLETED true \N
+6001 9001 2024-02-11 456.80 123 New Street, Downtown, CA 90210
COMPLETED true 100
+6002 9002 2024-02-12 289.45 456 Updated Ave, Midtown, NY 10001
PROCESSING false 200
+6003 9003 2024-02-13 378.90 789 Modern Blvd, Uptown, TX 75201
SHIPPED true 300
+6004 9004 2024-02-14 199.99 321 Future Lane, Innovation, WA 98001
PENDING true 400
+6005 9005 2024-02-15 567.25 654 Progress Drive, Tech City, OR 97201
COMPLETED false 500
+6006 9006 2024-02-16 123.75 987 Advanced Court, Silicon Valley, CA
94301 CANCELLED true 600
+
+-- !time_zone_time_travel_basic --
+1001 2001 2024-01-15 299.99 123 Maple Street, Springfield, IL 62701
COMPLETED true
+1002 2002 2024-01-16 156.50 456 Oak Avenue, Riverside, CA 92507
PROCESSING false
+1003 2003 2024-01-17 89.00 789 Pine Boulevard, Greenfield, TX
75001 SHIPPED true
+
+-- !time_zone_time_travel_plus08_jni_true --
+1001 2001 2024-01-15 299.99 123 Maple Street, Springfield, IL 62701
COMPLETED true
+1002 2002 2024-01-16 156.50 456 Oak Avenue, Riverside, CA 92507
PROCESSING false
+1003 2003 2024-01-17 89.00 789 Pine Boulevard, Greenfield, TX
75001 SHIPPED true
+
+-- !time_zone_time_travel_plus08_jni_false --
+1001 2001 2024-01-15 299.99 123 Maple Street, Springfield, IL 62701
COMPLETED true
+1002 2002 2024-01-16 156.50 456 Oak Avenue, Riverside, CA 92507
PROCESSING false
+1003 2003 2024-01-17 89.00 789 Pine Boulevard, Greenfield, TX
75001 SHIPPED true
+
+-- !time_zone_time_travel_plus06_jni_true --
+1001 2001 2024-01-15 299.99 123 Maple Street, Springfield, IL 62701
COMPLETED true \N
+1002 2002 2024-01-16 156.50 456 Oak Avenue, Riverside, CA 92507
PROCESSING false \N
+1003 2003 2024-01-17 89.00 789 Pine Boulevard, Greenfield, TX
75001 SHIPPED true \N
+2001 3001 2024-01-18 445.75 321 Cedar Lane, Millbrook, NY 12545
PENDING false \N
+2002 3002 2024-01-19 67.25 654 Birch Drive, Lakewood, CO 80226
COMPLETED true \N
+2003 3003 2024-01-20 188.90 987 Elm Court, Fairview, OR 97024
CANCELLED false \N
+3001 4001 2024-01-21 325.40 159 Willow Street, Brookdale, FL 33602
SHIPPED true \N
+3002 4002 2024-01-22 99.85 753 Aspen Road, Clearwater, WA 98012
PROCESSING true \N
+3003 4003 2024-01-23 512.30 264 Chestnut Avenue, Westfield, MI
48097 COMPLETED false \N
+5001 6001 2024-01-24 278.60 842 Hickory Lane, Stonewood, GA 30309
PENDING true \N
+5002 6002 2024-01-25 134.75 417 Poplar Street, Ridgefield, NV 89109
SHIPPED false \N
+5003 6003 2024-01-26 389.20 695 Sycamore Drive, Maplewood, AZ 85001
COMPLETED true \N
+6001 9001 2024-02-11 456.80 123 New Street, Downtown, CA 90210
COMPLETED true 100
+6002 9002 2024-02-12 289.45 456 Updated Ave, Midtown, NY 10001
PROCESSING false 200
+6003 9003 2024-02-13 378.90 789 Modern Blvd, Uptown, TX 75201
SHIPPED true 300
+6004 9004 2024-02-14 199.99 321 Future Lane, Innovation, WA 98001
PENDING true 400
+6005 9005 2024-02-15 567.25 654 Progress Drive, Tech City, OR 97201
COMPLETED false 500
+6006 9006 2024-02-16 123.75 987 Advanced Court, Silicon Valley, CA
94301 CANCELLED true 600
+
+-- !time_zone_time_travel_plus06_jni_false --
+1001 2001 2024-01-15 299.99 123 Maple Street, Springfield, IL 62701
COMPLETED true \N
+1002 2002 2024-01-16 156.50 456 Oak Avenue, Riverside, CA 92507
PROCESSING false \N
+1003 2003 2024-01-17 89.00 789 Pine Boulevard, Greenfield, TX
75001 SHIPPED true \N
+2001 3001 2024-01-18 445.75 321 Cedar Lane, Millbrook, NY 12545
PENDING false \N
+2002 3002 2024-01-19 67.25 654 Birch Drive, Lakewood, CO 80226
COMPLETED true \N
+2003 3003 2024-01-20 188.90 987 Elm Court, Fairview, OR 97024
CANCELLED false \N
+3001 4001 2024-01-21 325.40 159 Willow Street, Brookdale, FL 33602
SHIPPED true \N
+3002 4002 2024-01-22 99.85 753 Aspen Road, Clearwater, WA 98012
PROCESSING true \N
+3003 4003 2024-01-23 512.30 264 Chestnut Avenue, Westfield, MI
48097 COMPLETED false \N
+5001 6001 2024-01-24 278.60 842 Hickory Lane, Stonewood, GA 30309
PENDING true \N
+5002 6002 2024-01-25 134.75 417 Poplar Street, Ridgefield, NV 89109
SHIPPED false \N
+5003 6003 2024-01-26 389.20 695 Sycamore Drive, Maplewood, AZ 85001
COMPLETED true \N
+6001 9001 2024-02-11 456.80 123 New Street, Downtown, CA 90210
COMPLETED true 100
+6002 9002 2024-02-12 289.45 456 Updated Ave, Midtown, NY 10001
PROCESSING false 200
+6003 9003 2024-02-13 378.90 789 Modern Blvd, Uptown, TX 75201
SHIPPED true 300
+6004 9004 2024-02-14 199.99 321 Future Lane, Innovation, WA 98001
PENDING true 400
+6005 9005 2024-02-15 567.25 654 Progress Drive, Tech City, OR 97201
COMPLETED false 500
+6006 9006 2024-02-16 123.75 987 Advanced Court, Silicon Valley, CA
94301 CANCELLED true 600
+
diff --git
a/regression-test/suites/external_table_p0/paimon/paimon_time_travel.groovy
b/regression-test/suites/external_table_p0/paimon/paimon_time_travel.groovy
index 99a4e94a4c6..96e6551aa88 100644
--- a/regression-test/suites/external_table_p0/paimon/paimon_time_travel.groovy
+++ b/regression-test/suites/external_table_p0/paimon/paimon_time_travel.groovy
@@ -16,6 +16,8 @@
// under the License.
import java.time.format.DateTimeFormatter
+import java.time.format.DateTimeFormatterBuilder
+import java.time.temporal.ChronoField
import java.time.LocalDateTime
import java.time.ZoneId
@@ -29,12 +31,23 @@ suite("paimon_time_travel",
"p0,external,doris,external_docker,external_docker_d
return
}
// Create date time formatter
-
+ DateTimeFormatter unifiedFormatter = new DateTimeFormatterBuilder()
+ .appendPattern("yyyy-MM-dd")
+ .optionalStart()
+ .appendLiteral('T')
+ .optionalEnd()
+ .optionalStart()
+ .appendLiteral(' ')
+ .optionalEnd()
+ .appendPattern("HH:mm:ss")
+ .optionalStart()
+ .appendFraction(ChronoField.MILLI_OF_SECOND, 0, 3, true)
+ .optionalEnd()
+ .toFormatter()
String minio_port = context.config.otherConfigs.get("iceberg_minio_port")
String catalog_name = "test_paimon_time_travel_catalog"
String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
- DateTimeFormatter iso_formatter =
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS")
- DateTimeFormatter standard_formatter =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")
+ DateTimeFormatter outputFormatter =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")
String db_name = "test_paimon_time_travel_db"
String tableName = "tbl_time_travel"
try {
@@ -54,8 +67,8 @@ suite("paimon_time_travel",
"p0,external,doris,external_docker,external_docker_d
sql """switch `${catalog_name}`"""
logger.info("switched to catalog " + catalog_name)
sql """use ${db_name}"""
- //system table snapshots to get create time.
- List<List<Object>> snapshotRes = sql """ select
snapshot_id,commit_time from ${tableName}\$snapshots order by snapshot_id;"""
+ // Query system table snapshots to get creation time. Get the first
four snapshots before schema change (snapshot IDs 1-4)
+ List<List<Object>> snapshotRes = sql """ select
snapshot_id,commit_time from ${tableName}\$snapshots order by snapshot_id limit
4;"""
logger.info("Query result from ${tableName}\$snapshots:
${snapshotRes}")
assertTrue(snapshotRes.size()==4)
assertTrue(snapshotRes[0].size()==2)
@@ -68,14 +81,8 @@ suite("paimon_time_travel",
"p0,external,doris,external_docker,external_docker_d
logger.info("Processing snapshot ${index + 1}: ID=${snapshotId},
commit_time=${commitTime}")
try {
- LocalDateTime dateTime;
- if (commitTime.contains("T")){
- dateTime = LocalDateTime.parse(commitTime, iso_formatter)
- }else {
- dateTime = LocalDateTime.parse(commitTime,
standard_formatter)
- }
-
- String snapshotTime =
dateTime.atZone(ZoneId.systemDefault()).format(standard_formatter);
+ LocalDateTime dateTime = LocalDateTime.parse(commitTime,
unifiedFormatter)
+ String snapshotTime =
dateTime.atZone(ZoneId.systemDefault()).format(outputFormatter);
long timestamp = dateTime.atZone(ZoneId.systemDefault())
.toInstant()
.toEpochMilli()
@@ -191,8 +198,8 @@ suite("paimon_time_travel",
"p0,external,doris,external_docker,external_docker_d
}
}
-
- List<List<Object>> tagsResult = sql """ select snapshot_id,tag_name
from ${tableName}\$tags order by snapshot_id;"""
+ // Get the previous 4 snapshot IDs and their corresponding tags
+ List<List<Object>> tagsResult = sql """ select snapshot_id,tag_name
from ${tableName}\$tags order by snapshot_id limit 4;"""
logger.info("Query result from ${tableName}\$tags: ${tagsResult}")
assertTrue(tagsResult.size()==4)
assertTrue(tagsResult[0].size()==2)
@@ -239,13 +246,87 @@ suite("paimon_time_travel",
"p0,external,doris,external_docker,external_docker_d
}
}
+ /**
+ * Test time travel queries on snapshots created after schema changes.
+ *
+ * Background: run09.sql adds a new column, creating a schema evolution
+ * at snapshot ID 4. This test verifies that time travel works
correctly
+ * with snapshots (ID > 4) that use the updated schema.
+ */
+ List<List<Object>> snapshotSchemaChangeAfterRes = sql """ select
snapshot_id,commit_time from ${tableName}\$snapshots where snapshot_id > 4
order by snapshot_id limit 2;"""
+ logger.info("Query result from ${tableName}\$snapshots after schema
change: ${snapshotSchemaChangeAfterRes}")
+
+ snapshotSchemaChangeAfterRes.eachWithIndex { snapshotRow, index ->
+ int snapshotId = snapshotRow[0] as int
+ try {
+ String baseQueryName =
"qt_schema_change_snapshot_${snapshotId}"
+
+ // Time travel by snapshot ID after schema change
+ "${baseQueryName}_version_count" """select count(*) from
${tableName} FOR VERSION AS OF ${snapshotId} ;"""
+ "${baseQueryName}_version" """select * from ${tableName} FOR
VERSION AS OF ${snapshotId} order by order_id;"""
+ logger.info("Completed schema change queries for snapshot
${snapshotId}")
+
+ } catch (Exception e) {
+ logger.error("Failed to process schema change snapshot
${snapshotId}: ${e.message}")
+ throw e
+ }
+ }
+
+
+ // Test time zone behavior with time travel queries
+ List<List<Object>> timeTravelZone = sql """ select
snapshot_id,commit_time from ${tableName}\$snapshots order by snapshot_id limit
1;"""
+ logger.info("Query result from ${tableName}\$snapshots:
${timeTravelZone}")
+
+ String commitTime = timeTravelZone[0][1] as String
+
+ LocalDateTime dateTime = LocalDateTime.parse(commitTime,
unifiedFormatter)
+ String snapshotTime =
dateTime.atZone(ZoneId.systemDefault()).format(outputFormatter)
+
+ try {
+ // Basic time string query
+ qt_time_zone_time_travel_basic """select * from ${tableName} FOR
TIME AS OF \"${snapshotTime}\" order by order_id"""
+
+ // Test with +08:00 timezone
+ sql """set force_jni_scanner=true; set time_zone='+08:00';"""
+ qt_time_zone_time_travel_plus08_jni_true """select * from
${tableName} FOR TIME AS OF \"${snapshotTime}\" order by order_id"""
+
+ sql """set force_jni_scanner=false;"""
+ qt_time_zone_time_travel_plus08_jni_false """select * from
${tableName} FOR TIME AS OF \"${snapshotTime}\" order by order_id"""
+
+ // Test with +06:00 timezone
+ sql """set force_jni_scanner=true; set time_zone='+06:00';"""
+ qt_time_zone_time_travel_plus06_jni_true """select * from
${tableName} FOR TIME AS OF \"${snapshotTime}\" order by order_id"""
+
+ sql """set force_jni_scanner=false;"""
+ qt_time_zone_time_travel_plus06_jni_false """select * from
${tableName} FOR TIME AS OF \"${snapshotTime}\" order by order_id"""
+
+ // Test with +10:00 timezone - these should throw exceptions
+ sql """set force_jni_scanner=true; set time_zone='+10:00';"""
+ test {
+ sql """select * from ${tableName} FOR TIME AS OF
\"${snapshotTime}\" order by order_id"""
+ exception ("There is currently no snapshot earlier than or
equal to timestamp")
+ }
+
+ sql """set force_jni_scanner=false;"""
+ test {
+ sql """select * from ${tableName} FOR TIME AS OF
\"${snapshotTime}\" order by order_id"""
+ exception ("There is currently no snapshot earlier than or
equal to timestamp")
+ }
+
+ } finally {
+ sql """ unset variable time_zone; """
+ sql """ set force_jni_scanner = false; """
+ }
+
+
+ // Error handling tests
test {
sql """ select * from
${tableName}@branch('name'='not_exists_branch'); """
- exception "Branch 'not_exists_branch' does not exist"
+ exception "can't find branch: not_exists_branch"
}
test {
sql """ select * from ${tableName}@branch(not_exists_branch); """
- exception "Branch 'not_exists_branch' does not exist"
+ exception "can't find branch: not_exists_branch"
}
test {
sql """ select * from ${tableName}@tag('name'='not_exists_tag');
"""
@@ -278,6 +359,4 @@ suite("paimon_time_travel",
"p0,external,doris,external_docker,external_docker_d
} finally {
// sql """drop catalog if exists ${catalog_name}"""
}
-}
-
-
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]