This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new fdb65197e73 [feat](mtmv)mtmv support paimon partition
refresh(#43959,#44911) (#45878)
fdb65197e73 is described below
commit fdb65197e73f058ab88b90351e0891ac7ab83648
Author: zhangdong <[email protected]>
AuthorDate: Wed Dec 25 12:08:27 2024 +0800
[feat](mtmv)mtmv support paimon partition refresh(#43959,#44911) (#45878)
pick: #44911 #43959
only pick code about paimon, not pick some code about MTMV REFRESH
---
.../apache/doris/datasource/ExternalCatalog.java | 9 +-
.../doris/datasource/ExternalMetaCacheMgr.java | 12 +
.../doris/datasource/ExternalSchemaCache.java | 6 +-
.../org/apache/doris/datasource/ExternalTable.java | 9 +-
.../doris/datasource/hive/HMSExternalTable.java | 7 +-
.../maxcompute/MaxComputeExternalTable.java | 2 +-
.../MvccUtil.java} | 45 ++--
.../datasource/paimon/PaimonExternalTable.java | 227 +++++++++--------
.../datasource/paimon/PaimonMetadataCache.java | 144 +++++++++++
.../datasource/paimon/PaimonMetadataCacheMgr.java | 49 ++++
...hemaCacheValue.java => PaimonMvccSnapshot.java} | 21 +-
.../doris/datasource/paimon/PaimonPartition.java | 61 +++++
...emaCacheValue.java => PaimonPartitionInfo.java} | 31 ++-
...imonSchemaCacheValue.java => PaimonSchema.java} | 29 ++-
.../datasource/paimon/PaimonSchemaCacheKey.java | 55 +++++
.../datasource/paimon/PaimonSchemaCacheValue.java | 12 +-
...onSchemaCacheValue.java => PaimonSnapshot.java} | 25 +-
.../datasource/paimon/PaimonSnapshotCacheKey.java | 75 ++++++
...cheValue.java => PaimonSnapshotCacheValue.java} | 24 +-
.../apache/doris/datasource/paimon/PaimonUtil.java | 275 +++++++++++++++++++++
.../datasource/paimon/source/PaimonSource.java | 3 +-
.../apache/doris/job/extensions/mtmv/MTMVTask.java | 14 ++
.../org/apache/doris/nereids/StatementContext.java | 26 +-
.../rules/rewrite/PruneFileScanPartition.java | 9 +-
.../plans/commands/UpdateMvByPartitionCommand.java | 7 +
.../trees/plans/logical/LogicalFileScan.java | 7 +-
.../java/org/apache/doris/mtmv/PaimonUtilTest.java | 71 ++++++
regression-test/data/mtmv_p0/test_paimon_mtmv.out | 9 -
.../suites/mtmv_p0/test_paimon_mtmv.groovy | 62 -----
29 files changed, 1038 insertions(+), 288 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index e1d37b009e1..f3728498f2a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -40,6 +40,7 @@ import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import
org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
import org.apache.doris.common.util.Util;
+import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey;
import org.apache.doris.datasource.es.EsExternalDatabase;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalDatabase;
@@ -447,13 +448,13 @@ public abstract class ExternalCatalog
}
}
- public final Optional<SchemaCacheValue> getSchema(String dbName, String
tblName) {
+ public final Optional<SchemaCacheValue> getSchema(SchemaCacheKey key) {
makeSureInitialized();
- Optional<ExternalDatabase<? extends ExternalTable>> db = getDb(dbName);
+ Optional<ExternalDatabase<? extends ExternalTable>> db =
getDb(key.getDbName());
if (db.isPresent()) {
- Optional<? extends ExternalTable> table =
db.get().getTable(tblName);
+ Optional<? extends ExternalTable> table =
db.get().getTable(key.getTblName());
if (table.isPresent()) {
- return table.get().initSchemaAndUpdateTime();
+ return table.get().initSchemaAndUpdateTime(key);
}
}
return Optional.empty();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
index cc40ad292ce..24f55e74266 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalMetaCacheMgr.java
@@ -31,6 +31,8 @@ import
org.apache.doris.datasource.iceberg.IcebergMetadataCacheMgr;
import org.apache.doris.datasource.maxcompute.MaxComputeMetadataCache;
import org.apache.doris.datasource.maxcompute.MaxComputeMetadataCacheMgr;
import org.apache.doris.datasource.metacache.MetaCache;
+import org.apache.doris.datasource.paimon.PaimonMetadataCache;
+import org.apache.doris.datasource.paimon.PaimonMetadataCacheMgr;
import org.apache.doris.fs.FileSystemCache;
import org.apache.doris.nereids.exceptions.NotSupportedException;
@@ -92,6 +94,7 @@ public class ExternalMetaCacheMgr {
private ExternalRowCountCache rowCountCache;
private final IcebergMetadataCacheMgr icebergMetadataCacheMgr;
private final MaxComputeMetadataCacheMgr maxComputeMetadataCacheMgr;
+ private final PaimonMetadataCacheMgr paimonMetadataCacheMgr;
public ExternalMetaCacheMgr() {
rowCountRefreshExecutor = ThreadPoolManager.newDaemonFixedThreadPool(
@@ -122,6 +125,7 @@ public class ExternalMetaCacheMgr {
hudiPartitionMgr = new HudiPartitionMgr(commonRefreshExecutor);
icebergMetadataCacheMgr = new
IcebergMetadataCacheMgr(commonRefreshExecutor);
maxComputeMetadataCacheMgr = new MaxComputeMetadataCacheMgr();
+ paimonMetadataCacheMgr = new
PaimonMetadataCacheMgr(commonRefreshExecutor);
}
public ExecutorService getFileListingExecutor() {
@@ -167,6 +171,10 @@ public class ExternalMetaCacheMgr {
return icebergMetadataCacheMgr.getIcebergMetadataCache();
}
+ public PaimonMetadataCache getPaimonMetadataCache() {
+ return paimonMetadataCacheMgr.getPaimonMetadataCache();
+ }
+
public MaxComputeMetadataCache getMaxComputeMetadataCache(long catalogId) {
return
maxComputeMetadataCacheMgr.getMaxComputeMetadataCache(catalogId);
}
@@ -189,6 +197,7 @@ public class ExternalMetaCacheMgr {
hudiPartitionMgr.removePartitionProcessor(catalogId);
icebergMetadataCacheMgr.removeCache(catalogId);
maxComputeMetadataCacheMgr.removeCache(catalogId);
+ paimonMetadataCacheMgr.removeCache(catalogId);
}
public void invalidateTableCache(long catalogId, String dbName, String
tblName) {
@@ -204,6 +213,7 @@ public class ExternalMetaCacheMgr {
hudiPartitionMgr.cleanTablePartitions(catalogId, dbName, tblName);
icebergMetadataCacheMgr.invalidateTableCache(catalogId, dbName,
tblName);
maxComputeMetadataCacheMgr.invalidateTableCache(catalogId, dbName,
tblName);
+ paimonMetadataCacheMgr.invalidateTableCache(catalogId, dbName,
tblName);
if (LOG.isDebugEnabled()) {
LOG.debug("invalid table cache for {}.{} in catalog {}", dbName,
tblName, catalogId);
}
@@ -222,6 +232,7 @@ public class ExternalMetaCacheMgr {
hudiPartitionMgr.cleanDatabasePartitions(catalogId, dbName);
icebergMetadataCacheMgr.invalidateDbCache(catalogId, dbName);
maxComputeMetadataCacheMgr.invalidateDbCache(catalogId, dbName);
+ paimonMetadataCacheMgr.invalidateDbCache(catalogId, dbName);
if (LOG.isDebugEnabled()) {
LOG.debug("invalid db cache for {} in catalog {}", dbName,
catalogId);
}
@@ -239,6 +250,7 @@ public class ExternalMetaCacheMgr {
hudiPartitionMgr.cleanPartitionProcess(catalogId);
icebergMetadataCacheMgr.invalidateCatalogCache(catalogId);
maxComputeMetadataCacheMgr.invalidateCatalogCache(catalogId);
+ paimonMetadataCacheMgr.invalidateCatalogCache(catalogId);
if (LOG.isDebugEnabled()) {
LOG.debug("invalid catalog cache for {}", catalogId);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java
index ad1c1306e34..e307dbe4fc9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java
@@ -74,7 +74,7 @@ public class ExternalSchemaCache {
}
private Optional<SchemaCacheValue> loadSchema(SchemaCacheKey key) {
- Optional<SchemaCacheValue> schema = catalog.getSchema(key.dbName,
key.tblName);
+ Optional<SchemaCacheValue> schema = catalog.getSchema(key);
if (LOG.isDebugEnabled()) {
LOG.debug("load schema for {} in catalog {}", key,
catalog.getName());
}
@@ -83,6 +83,10 @@ public class ExternalSchemaCache {
public Optional<SchemaCacheValue> getSchemaValue(String dbName, String
tblName) {
SchemaCacheKey key = new SchemaCacheKey(dbName, tblName);
+ return getSchemaValue(key);
+ }
+
+ public Optional<SchemaCacheValue> getSchemaValue(SchemaCacheKey key) {
return schemaCache.get(key);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java
index 716f2bdca44..237457b5a99 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java
@@ -29,6 +29,7 @@ import org.apache.doris.common.Pair;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.Util;
+import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey;
import org.apache.doris.datasource.mvcc.MvccSnapshot;
import
org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions;
import org.apache.doris.persist.gson.GsonPostProcessable;
@@ -307,8 +308,12 @@ public class ExternalTable implements TableIf, Writable,
GsonPostProcessable {
*
* @return
*/
- public Optional<SchemaCacheValue> initSchemaAndUpdateTime() {
+ public Optional<SchemaCacheValue> initSchemaAndUpdateTime(SchemaCacheKey
key) {
schemaUpdateTime = System.currentTimeMillis();
+ return initSchema(key);
+ }
+
+ public Optional<SchemaCacheValue> initSchema(SchemaCacheKey key) {
return initSchema();
}
@@ -401,7 +406,7 @@ public class ExternalTable implements TableIf, Writable,
GsonPostProcessable {
* @param snapshot if not support mvcc, ignore this
* @return partitionName ==> PartitionItem
*/
- protected Map<String, PartitionItem>
getNameToPartitionItems(Optional<MvccSnapshot> snapshot) {
+ public Map<String, PartitionItem>
getNameToPartitionItems(Optional<MvccSnapshot> snapshot) {
return Collections.emptyMap();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
index 54d47f0e9b8..b6520116355 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
@@ -31,6 +31,7 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.SchemaCacheValue;
import org.apache.doris.datasource.TablePartitionValues;
@@ -331,7 +332,7 @@ public class HMSExternalTable extends ExternalTable
implements MTMVRelatedTableI
}
@Override
- protected Map<String, PartitionItem>
getNameToPartitionItems(Optional<MvccSnapshot> snapshot) {
+ public Map<String, PartitionItem>
getNameToPartitionItems(Optional<MvccSnapshot> snapshot) {
return getNameToPartitionItems();
}
@@ -508,6 +509,10 @@ public class HMSExternalTable extends ExternalTable
implements MTMVRelatedTableI
}
@Override
+ public Optional<SchemaCacheValue> initSchemaAndUpdateTime(SchemaCacheKey
key) {
+ return initSchemaAndUpdateTime();
+ }
+
public Optional<SchemaCacheValue> initSchemaAndUpdateTime() {
org.apache.hadoop.hive.metastore.api.Table table =
((HMSExternalCatalog) catalog).getClient()
.getTable(dbName, name);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java
index 0f748f59e92..dbbbcf2d6a1 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java
@@ -92,7 +92,7 @@ public class MaxComputeExternalTable extends ExternalTable {
}
@Override
- protected Map<String, PartitionItem>
getNameToPartitionItems(Optional<MvccSnapshot> snapshot) {
+ public Map<String, PartitionItem>
getNameToPartitionItems(Optional<MvccSnapshot> snapshot) {
if (getPartitionColumns().isEmpty()) {
return Collections.emptyMap();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccUtil.java
similarity index 50%
copy from
fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java
copy to fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccUtil.java
index aaaefe7f32d..ffdaff770e2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/mvcc/MvccUtil.java
@@ -15,25 +15,30 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.datasource.paimon;
-
-import org.apache.doris.catalog.Column;
-import org.apache.doris.datasource.SchemaCacheValue;
-
-import org.apache.paimon.table.Table;
-
-import java.util.List;
-
-public class PaimonSchemaCacheValue extends SchemaCacheValue {
-
- private Table paimonTable;
-
- public PaimonSchemaCacheValue(List<Column> schema, Table paimonTable) {
- super(schema);
- this.paimonTable = paimonTable;
- }
-
- public Table getPaimonTable() {
- return paimonTable;
+package org.apache.doris.datasource.mvcc;
+
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.nereids.StatementContext;
+import org.apache.doris.qe.ConnectContext;
+
+import java.util.Optional;
+
+public class MvccUtil {
+ /**
+ * get Snapshot From StatementContext
+ *
+ * @param tableIf
+ * @return MvccSnapshot
+ */
+ public static Optional<MvccSnapshot> getSnapshotFromContext(TableIf
tableIf) {
+ ConnectContext connectContext = ConnectContext.get();
+ if (connectContext == null) {
+ return Optional.empty();
+ }
+ StatementContext statementContext =
connectContext.getStatementContext();
+ if (statementContext == null) {
+ return Optional.empty();
+ }
+ return statementContext.getSnapshot(tableIf);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java
index 196b01efe2c..e7d1554d9a7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java
@@ -18,10 +18,16 @@
package org.apache.doris.datasource.paimon;
import org.apache.doris.catalog.Column;
-import org.apache.doris.catalog.ScalarType;
-import org.apache.doris.catalog.Type;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.PartitionItem;
+import org.apache.doris.datasource.CacheException;
+import org.apache.doris.datasource.ExternalSchemaCache;
+import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.SchemaCacheValue;
+import org.apache.doris.datasource.mvcc.MvccSnapshot;
+import org.apache.doris.datasource.mvcc.MvccTable;
+import org.apache.doris.datasource.mvcc.MvccUtil;
import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.statistics.BaseAnalysisTask;
import org.apache.doris.statistics.ExternalAnalysisTask;
@@ -30,30 +36,36 @@ import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;
import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.Split;
-import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.table.system.SchemasTable;
import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.DecimalType;
-import org.apache.paimon.types.MapType;
-import org.apache.paimon.types.RowType;
-import java.util.ArrayList;
+import java.io.IOException;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
-import java.util.stream.Collectors;
+import java.util.Set;
-public class PaimonExternalTable extends ExternalTable {
+public class PaimonExternalTable extends ExternalTable implements MvccTable {
private static final Logger LOG =
LogManager.getLogger(PaimonExternalTable.class);
+ private final Table paimonTable;
+
public PaimonExternalTable(long id, String name, String dbName,
PaimonExternalCatalog catalog) {
super(id, name, catalog, dbName, TableType.PAIMON_EXTERNAL_TABLE);
+ this.paimonTable = catalog.getPaimonTable(dbName, name);
}
public String getPaimonCatalogType() {
@@ -67,99 +79,27 @@ public class PaimonExternalTable extends ExternalTable {
}
}
- public Table getPaimonTable() {
- makeSureInitialized();
- Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
- return schemaCacheValue.map(value -> ((PaimonSchemaCacheValue)
value).getPaimonTable()).orElse(null);
+ public Table getPaimonTable(Optional<MvccSnapshot> snapshot) {
+ return paimonTable.copy(
+ Collections.singletonMap(CoreOptions.SCAN_VERSION.key(),
+
String.valueOf(getOrFetchSnapshotCacheValue(snapshot).getSnapshot().getSnapshotId())));
}
- @Override
- public Optional<SchemaCacheValue> initSchema() {
- Table paimonTable = ((PaimonExternalCatalog)
catalog).getPaimonTable(dbName, name);
- TableSchema schema = ((FileStoreTable) paimonTable).schema();
- List<DataField> columns = schema.fields();
- List<Column> tmpSchema =
Lists.newArrayListWithCapacity(columns.size());
- for (DataField field : columns) {
- tmpSchema.add(new Column(field.name().toLowerCase(),
- paimonTypeToDorisType(field.type()), true, null, true,
field.description(), true,
- field.id()));
- }
- return Optional.of(new PaimonSchemaCacheValue(tmpSchema, paimonTable));
- }
-
- private Type
paimonPrimitiveTypeToDorisType(org.apache.paimon.types.DataType dataType) {
- int tsScale = 3; // default
- switch (dataType.getTypeRoot()) {
- case BOOLEAN:
- return Type.BOOLEAN;
- case INTEGER:
- return Type.INT;
- case BIGINT:
- return Type.BIGINT;
- case FLOAT:
- return Type.FLOAT;
- case DOUBLE:
- return Type.DOUBLE;
- case SMALLINT:
- return Type.SMALLINT;
- case TINYINT:
- return Type.TINYINT;
- case VARCHAR:
- case BINARY:
- case CHAR:
- case VARBINARY:
- return Type.STRING;
- case DECIMAL:
- DecimalType decimal = (DecimalType) dataType;
- return ScalarType.createDecimalV3Type(decimal.getPrecision(),
decimal.getScale());
- case DATE:
- return ScalarType.createDateV2Type();
- case TIMESTAMP_WITHOUT_TIME_ZONE:
- if (dataType instanceof org.apache.paimon.types.TimestampType)
{
- tsScale = ((org.apache.paimon.types.TimestampType)
dataType).getPrecision();
- if (tsScale > 6) {
- tsScale = 6;
- }
- } else if (dataType instanceof
org.apache.paimon.types.LocalZonedTimestampType) {
- tsScale =
((org.apache.paimon.types.LocalZonedTimestampType) dataType).getPrecision();
- if (tsScale > 6) {
- tsScale = 6;
- }
- }
- return ScalarType.createDatetimeV2Type(tsScale);
- case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
- if (dataType instanceof
org.apache.paimon.types.LocalZonedTimestampType) {
- tsScale =
((org.apache.paimon.types.LocalZonedTimestampType) dataType).getPrecision();
- if (tsScale > 6) {
- tsScale = 6;
- }
- }
- return ScalarType.createDatetimeV2Type(tsScale);
- case ARRAY:
- ArrayType arrayType = (ArrayType) dataType;
- Type innerType =
paimonPrimitiveTypeToDorisType(arrayType.getElementType());
- return org.apache.doris.catalog.ArrayType.create(innerType,
true);
- case MAP:
- MapType mapType = (MapType) dataType;
- return new org.apache.doris.catalog.MapType(
- paimonTypeToDorisType(mapType.getKeyType()),
paimonTypeToDorisType(mapType.getValueType()));
- case ROW:
- RowType rowType = (RowType) dataType;
- List<DataField> fields = rowType.getFields();
- return new org.apache.doris.catalog.StructType(fields.stream()
- .map(field -> new
org.apache.doris.catalog.StructField(field.name(),
- paimonTypeToDorisType(field.type())))
- .collect(Collectors.toCollection(ArrayList::new)));
- case TIME_WITHOUT_TIME_ZONE:
- return Type.UNSUPPORTED;
- default:
- LOG.warn("Cannot transform unknown type: " +
dataType.getTypeRoot());
- return Type.UNSUPPORTED;
+ public PaimonSchemaCacheValue getPaimonSchemaCacheValue(long schemaId) {
+ ExternalSchemaCache cache =
Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(catalog);
+ Optional<SchemaCacheValue> schemaCacheValue = cache.getSchemaValue(
+ new PaimonSchemaCacheKey(dbName, name, schemaId));
+ if (!schemaCacheValue.isPresent()) {
+ throw new CacheException("failed to getSchema for: %s.%s.%s.%s",
+ null, catalog.getName(), dbName, name, schemaId);
}
+ return (PaimonSchemaCacheValue) schemaCacheValue.get();
}
- protected Type paimonTypeToDorisType(org.apache.paimon.types.DataType
type) {
- return paimonPrimitiveTypeToDorisType(type);
+ private PaimonSnapshotCacheValue getPaimonSnapshotCacheValue() {
+ makeSureInitialized();
+ return
Env.getCurrentEnv().getExtMetaCacheMgr().getPaimonMetadataCache()
+ .getPaimonSnapshot(catalog, dbName, name);
}
@Override
@@ -190,12 +130,6 @@ public class PaimonExternalTable extends ExternalTable {
makeSureInitialized();
try {
long rowCount = 0;
- Optional<SchemaCacheValue> schemaCacheValue =
getSchemaCacheValue();
- Table paimonTable = schemaCacheValue.map(value ->
((PaimonSchemaCacheValue) value).getPaimonTable())
- .orElse(null);
- if (paimonTable == null) {
- return UNKNOWN_ROW_COUNT;
- }
List<Split> splits =
paimonTable.newReadBuilder().newScan().plan().splits();
for (Split split : splits) {
rowCount += split.rowCount();
@@ -206,4 +140,87 @@ public class PaimonExternalTable extends ExternalTable {
}
return UNKNOWN_ROW_COUNT;
}
+
+ @Override
+ public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) {
+ return getPaimonSchemaCacheValue(snapshot).getPartitionColumns();
+ }
+
+ @Override
+ public MvccSnapshot loadSnapshot() {
+ return new PaimonMvccSnapshot(getPaimonSnapshotCacheValue());
+ }
+
+ @Override
+ public Map<String, PartitionItem>
getNameToPartitionItems(Optional<MvccSnapshot> snapshot) {
+ return
getOrFetchSnapshotCacheValue(snapshot).getPartitionInfo().getNameToPartitionItem();
+ }
+
+ @Override
+ public boolean supportInternalPartitionPruned() {
+ return true;
+ }
+
+ @Override
+ public List<Column> getFullSchema() {
+ return
getPaimonSchemaCacheValue(MvccUtil.getSnapshotFromContext(this)).getSchema();
+ }
+
+ @Override
+ public Optional<SchemaCacheValue> initSchema(SchemaCacheKey key) {
+ makeSureInitialized();
+ PaimonSchemaCacheKey paimonSchemaCacheKey = (PaimonSchemaCacheKey) key;
+ try {
+ PaimonSchema schema =
loadPaimonSchemaBySchemaId(paimonSchemaCacheKey);
+ List<DataField> columns = schema.getFields();
+ List<Column> dorisColumns =
Lists.newArrayListWithCapacity(columns.size());
+ Set<String> partitionColumnNames =
Sets.newHashSet(schema.getPartitionKeys());
+ List<Column> partitionColumns = Lists.newArrayList();
+ for (DataField field : columns) {
+ Column column = new Column(field.name().toLowerCase(),
+ PaimonUtil.paimonTypeToDorisType(field.type()), true,
null, true, field.description(), true,
+ field.id());
+ dorisColumns.add(column);
+ if (partitionColumnNames.contains(field.name())) {
+ partitionColumns.add(column);
+ }
+ }
+ return Optional.of(new PaimonSchemaCacheValue(dorisColumns,
partitionColumns));
+ } catch (Exception e) {
+ throw new CacheException("failed to initSchema for: %s.%s.%s.%s",
+ null, getCatalog().getName(), key.getDbName(),
key.getTblName(),
+ paimonSchemaCacheKey.getSchemaId());
+ }
+ }
+
+ private PaimonSchema loadPaimonSchemaBySchemaId(PaimonSchemaCacheKey key)
throws IOException {
+ Table table = ((PaimonExternalCatalog)
getCatalog()).getPaimonTable(key.getDbName(),
+ name + Catalog.SYSTEM_TABLE_SPLITTER + SchemasTable.SCHEMAS);
+ PredicateBuilder builder = new PredicateBuilder(table.rowType());
+ Predicate predicate = builder.equal(0, key.getSchemaId());
+ // Adding predicates will also return excess data
+ List<InternalRow> rows = PaimonUtil.read(table, new int[][] {{0}, {1},
{2}}, predicate);
+ for (InternalRow row : rows) {
+ PaimonSchema schema = PaimonUtil.rowToSchema(row);
+ if (schema.getSchemaId() == key.getSchemaId()) {
+ return schema;
+ }
+ }
+ throw new CacheException("failed to initSchema for: %s.%s.%s.%s",
+ null, getCatalog().getName(), key.getDbName(),
key.getTblName(), key.getSchemaId());
+ }
+
+ private PaimonSchemaCacheValue
getPaimonSchemaCacheValue(Optional<MvccSnapshot> snapshot) {
+ PaimonSnapshotCacheValue snapshotCacheValue =
getOrFetchSnapshotCacheValue(snapshot);
+ return
getPaimonSchemaCacheValue(snapshotCacheValue.getSnapshot().getSchemaId());
+ }
+
+ private PaimonSnapshotCacheValue
getOrFetchSnapshotCacheValue(Optional<MvccSnapshot> snapshot) {
+ if (snapshot.isPresent()) {
+ return ((PaimonMvccSnapshot)
snapshot.get()).getSnapshotCacheValue();
+ } else {
+ return getPaimonSnapshotCacheValue();
+ }
+ }
+
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java
new file mode 100644
index 00000000000..5b711e07066
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java
@@ -0,0 +1,144 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.paimon;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.CacheFactory;
+import org.apache.doris.common.Config;
+import org.apache.doris.datasource.CacheException;
+import org.apache.doris.datasource.CatalogIf;
+import org.apache.doris.datasource.ExternalMetaCacheMgr;
+
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.system.PartitionsTable;
+import org.apache.paimon.table.system.SnapshotsTable;
+import org.jetbrains.annotations.NotNull;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.OptionalLong;
+import java.util.concurrent.ExecutorService;
+
+public class PaimonMetadataCache {
+
+ private final LoadingCache<PaimonSnapshotCacheKey,
PaimonSnapshotCacheValue> snapshotCache;
+
+ public PaimonMetadataCache(ExecutorService executor) {
+ CacheFactory snapshotCacheFactory = new CacheFactory(
+ OptionalLong.of(28800L),
+
OptionalLong.of(Config.external_cache_expire_time_minutes_after_access * 60),
+ Config.max_external_table_cache_num,
+ true,
+ null);
+ this.snapshotCache = snapshotCacheFactory.buildCache(key ->
loadSnapshot(key), null, executor);
+ }
+
+ @NotNull
+ private PaimonSnapshotCacheValue loadSnapshot(PaimonSnapshotCacheKey key) {
+ try {
+ PaimonSnapshot latestSnapshot = loadLatestSnapshot(key);
+ PaimonExternalTable table = (PaimonExternalTable)
key.getCatalog().getDbOrAnalysisException(key.getDbName())
+ .getTableOrAnalysisException(key.getTableName());
+ List<Column> partitionColumns =
table.getPaimonSchemaCacheValue(latestSnapshot.getSchemaId())
+ .getPartitionColumns();
+ PaimonPartitionInfo partitionInfo = loadPartitionInfo(key,
partitionColumns);
+ return new PaimonSnapshotCacheValue(partitionInfo, latestSnapshot);
+ } catch (IOException | AnalysisException e) {
+ throw new CacheException("failed to loadSnapshot for: %s.%s.%s",
+ e, key.getCatalog().getName(), key.getDbName(),
key.getTableName());
+ }
+ }
+
+ private PaimonPartitionInfo loadPartitionInfo(PaimonSnapshotCacheKey key,
List<Column> partitionColumns)
+ throws IOException, AnalysisException {
+ if (CollectionUtils.isEmpty(partitionColumns)) {
+ return new PaimonPartitionInfo();
+ }
+ List<PaimonPartition> paimonPartitions = loadPartitions(key);
+ return PaimonUtil.generatePartitionInfo(partitionColumns,
paimonPartitions);
+ }
+
+ private List<PaimonPartition> loadPartitions(PaimonSnapshotCacheKey key)
+ throws IOException {
+ Table table = ((PaimonExternalCatalog)
key.getCatalog()).getPaimonTable(key.getDbName(),
+ key.getTableName() + Catalog.SYSTEM_TABLE_SPLITTER +
PartitionsTable.PARTITIONS);
+ List<InternalRow> rows = PaimonUtil.read(table, null, null);
+ List<PaimonPartition> res =
Lists.newArrayListWithCapacity(rows.size());
+ for (InternalRow row : rows) {
+ res.add(PaimonUtil.rowToPartition(row));
+ }
+ return res;
+ }
+
+ private PaimonSnapshot loadLatestSnapshot(PaimonSnapshotCacheKey key)
throws IOException {
+ Table table = ((PaimonExternalCatalog)
key.getCatalog()).getPaimonTable(key.getDbName(),
+ key.getTableName() + Catalog.SYSTEM_TABLE_SPLITTER +
SnapshotsTable.SNAPSHOTS);
+ // snapshotId and schemaId
+ List<InternalRow> rows = PaimonUtil.read(table, new int[][] {{0},
{1}}, null);
+ long latestSnapshotId = 0L;
+ long latestSchemaId = 0L;
+ for (InternalRow row : rows) {
+ long snapshotId = row.getLong(0);
+ if (snapshotId > latestSnapshotId) {
+ latestSnapshotId = snapshotId;
+ latestSchemaId = row.getLong(1);
+ }
+ }
+ return new PaimonSnapshot(latestSnapshotId, latestSchemaId);
+ }
+
+ public void invalidateCatalogCache(long catalogId) {
+ snapshotCache.asMap().keySet().stream()
+ .filter(key -> key.getCatalog().getId() == catalogId)
+ .forEach(snapshotCache::invalidate);
+ }
+
+ public void invalidateTableCache(long catalogId, String dbName, String
tblName) {
+ snapshotCache.asMap().keySet().stream()
+ .filter(key -> key.getCatalog().getId() == catalogId &&
key.getDbName().equals(dbName)
+ && key.getTableName().equals(
+ tblName))
+ .forEach(snapshotCache::invalidate);
+ }
+
+ public void invalidateDbCache(long catalogId, String dbName) {
+ snapshotCache.asMap().keySet().stream()
+ .filter(key -> key.getCatalog().getId() == catalogId &&
key.getDbName().equals(dbName))
+ .forEach(snapshotCache::invalidate);
+ }
+
+ public PaimonSnapshotCacheValue getPaimonSnapshot(CatalogIf catalog,
String dbName, String tbName) {
+ PaimonSnapshotCacheKey key = new PaimonSnapshotCacheKey(catalog,
dbName, tbName);
+ return snapshotCache.get(key);
+ }
+
+ public Map<String, Map<String, String>> getCacheStats() {
+ Map<String, Map<String, String>> res = Maps.newHashMap();
+ res.put("paimon_snapshot_cache",
ExternalMetaCacheMgr.getCacheStats(snapshotCache.stats(),
+ snapshotCache.estimatedSize()));
+ return res;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCacheMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCacheMgr.java
new file mode 100644
index 00000000000..a282fde665b
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCacheMgr.java
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.paimon;
+
+import java.util.concurrent.ExecutorService;
+
+public class PaimonMetadataCacheMgr {
+
+ private PaimonMetadataCache paimonMetadataCache;
+
+ public PaimonMetadataCacheMgr(ExecutorService executor) {
+ this.paimonMetadataCache = new PaimonMetadataCache(executor);
+ }
+
+ public PaimonMetadataCache getPaimonMetadataCache() {
+ return paimonMetadataCache;
+ }
+
+ public void removeCache(long catalogId) {
+ paimonMetadataCache.invalidateCatalogCache(catalogId);
+ }
+
+ public void invalidateCatalogCache(long catalogId) {
+ paimonMetadataCache.invalidateCatalogCache(catalogId);
+ }
+
+ public void invalidateTableCache(long catalogId, String dbName, String
tblName) {
+ paimonMetadataCache.invalidateTableCache(catalogId, dbName, tblName);
+ }
+
+ public void invalidateDbCache(long catalogId, String dbName) {
+ paimonMetadataCache.invalidateDbCache(catalogId, dbName);
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMvccSnapshot.java
similarity index 65%
copy from
fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java
copy to
fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMvccSnapshot.java
index aaaefe7f32d..2307e91adb3 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMvccSnapshot.java
@@ -17,23 +17,16 @@
package org.apache.doris.datasource.paimon;
-import org.apache.doris.catalog.Column;
-import org.apache.doris.datasource.SchemaCacheValue;
+import org.apache.doris.datasource.mvcc.MvccSnapshot;
-import org.apache.paimon.table.Table;
+public class PaimonMvccSnapshot implements MvccSnapshot {
+ private final PaimonSnapshotCacheValue snapshotCacheValue;
-import java.util.List;
-
-public class PaimonSchemaCacheValue extends SchemaCacheValue {
-
- private Table paimonTable;
-
- public PaimonSchemaCacheValue(List<Column> schema, Table paimonTable) {
- super(schema);
- this.paimonTable = paimonTable;
+ public PaimonMvccSnapshot(PaimonSnapshotCacheValue snapshotCacheValue) {
+ this.snapshotCacheValue = snapshotCacheValue;
}
- public Table getPaimonTable() {
- return paimonTable;
+ public PaimonSnapshotCacheValue getSnapshotCacheValue() {
+ return snapshotCacheValue;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartition.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartition.java
new file mode 100644
index 00000000000..545448199b3
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartition.java
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.paimon;
+
+//
https://paimon.apache.org/docs/0.9/maintenance/system-tables/#partitions-table
+public class PaimonPartition {
+ // Partition values, for example: [1, dd]
+ private final String partitionValues;
+ // The amount of data in the partition
+ private final long recordCount;
+ // Partition file size
+ private final long fileSizeInBytes;
+ // Number of partition files
+ private final long fileCount;
+ // Last update time of partition
+ private final long lastUpdateTime;
+
+ public PaimonPartition(String partitionValues, long recordCount, long
fileSizeInBytes, long fileCount,
+ long lastUpdateTime) {
+ this.partitionValues = partitionValues;
+ this.recordCount = recordCount;
+ this.fileSizeInBytes = fileSizeInBytes;
+ this.fileCount = fileCount;
+ this.lastUpdateTime = lastUpdateTime;
+ }
+
+ public String getPartitionValues() {
+ return partitionValues;
+ }
+
+ public long getRecordCount() {
+ return recordCount;
+ }
+
+ public long getFileSizeInBytes() {
+ return fileSizeInBytes;
+ }
+
+ public long getFileCount() {
+ return fileCount;
+ }
+
+ public long getLastUpdateTime() {
+ return lastUpdateTime;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartitionInfo.java
similarity index 50%
copy from
fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java
copy to
fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartitionInfo.java
index aaaefe7f32d..4d3326f8e48 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonPartitionInfo.java
@@ -17,23 +17,32 @@
package org.apache.doris.datasource.paimon;
-import org.apache.doris.catalog.Column;
-import org.apache.doris.datasource.SchemaCacheValue;
+import org.apache.doris.catalog.PartitionItem;
-import org.apache.paimon.table.Table;
+import com.google.common.collect.Maps;
-import java.util.List;
+import java.util.Map;
-public class PaimonSchemaCacheValue extends SchemaCacheValue {
+public class PaimonPartitionInfo {
+ private final Map<String, PartitionItem> nameToPartitionItem;
+ private final Map<String, PaimonPartition> nameToPartition;
- private Table paimonTable;
+ public PaimonPartitionInfo() {
+ this.nameToPartitionItem = Maps.newHashMap();
+ this.nameToPartition = Maps.newHashMap();
+ }
+
+ public PaimonPartitionInfo(Map<String, PartitionItem> nameToPartitionItem,
+ Map<String, PaimonPartition> nameToPartition) {
+ this.nameToPartitionItem = nameToPartitionItem;
+ this.nameToPartition = nameToPartition;
+ }
- public PaimonSchemaCacheValue(List<Column> schema, Table paimonTable) {
- super(schema);
- this.paimonTable = paimonTable;
+ public Map<String, PartitionItem> getNameToPartitionItem() {
+ return nameToPartitionItem;
}
- public Table getPaimonTable() {
- return paimonTable;
+ public Map<String, PaimonPartition> getNameToPartition() {
+ return nameToPartition;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchema.java
similarity index 59%
copy from
fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java
copy to
fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchema.java
index aaaefe7f32d..ef26e1ed208 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchema.java
@@ -17,23 +17,30 @@
package org.apache.doris.datasource.paimon;
-import org.apache.doris.catalog.Column;
-import org.apache.doris.datasource.SchemaCacheValue;
-
-import org.apache.paimon.table.Table;
+import org.apache.paimon.types.DataField;
import java.util.List;
-public class PaimonSchemaCacheValue extends SchemaCacheValue {
+public class PaimonSchema {
+ private final long schemaId;
+ private final List<DataField> fields;
+ private final List<String> partitionKeys;
+
+ public PaimonSchema(long schemaId, List<DataField> fields, List<String>
partitionKeys) {
+ this.schemaId = schemaId;
+ this.fields = fields;
+ this.partitionKeys = partitionKeys;
+ }
- private Table paimonTable;
+ public long getSchemaId() {
+ return schemaId;
+ }
- public PaimonSchemaCacheValue(List<Column> schema, Table paimonTable) {
- super(schema);
- this.paimonTable = paimonTable;
+ public List<DataField> getFields() {
+ return fields;
}
- public Table getPaimonTable() {
- return paimonTable;
+ public List<String> getPartitionKeys() {
+ return partitionKeys;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheKey.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheKey.java
new file mode 100644
index 00000000000..f74555b369b
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheKey.java
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.paimon;
+
+import org.apache.doris.datasource.ExternalSchemaCache.SchemaCacheKey;
+
+import com.google.common.base.Objects;
+
+public class PaimonSchemaCacheKey extends SchemaCacheKey {
+ private final long schemaId;
+
+ public PaimonSchemaCacheKey(String dbName, String tableName, long
schemaId) {
+ super(dbName, tableName);
+ this.schemaId = schemaId;
+ }
+
+ public long getSchemaId() {
+ return schemaId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof PaimonSchemaCacheKey)) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ PaimonSchemaCacheKey that = (PaimonSchemaCacheKey) o;
+ return schemaId == that.schemaId;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(super.hashCode(), schemaId);
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java
index aaaefe7f32d..ccb530a3cbc 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java
@@ -20,20 +20,18 @@ package org.apache.doris.datasource.paimon;
import org.apache.doris.catalog.Column;
import org.apache.doris.datasource.SchemaCacheValue;
-import org.apache.paimon.table.Table;
-
import java.util.List;
public class PaimonSchemaCacheValue extends SchemaCacheValue {
- private Table paimonTable;
+ private List<Column> partitionColumns;
- public PaimonSchemaCacheValue(List<Column> schema, Table paimonTable) {
+ public PaimonSchemaCacheValue(List<Column> schema, List<Column>
partitionColumns) {
super(schema);
- this.paimonTable = paimonTable;
+ this.partitionColumns = partitionColumns;
}
- public Table getPaimonTable() {
- return paimonTable;
+ public List<Column> getPartitionColumns() {
+ return partitionColumns;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshot.java
similarity index 65%
copy from
fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java
copy to
fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshot.java
index aaaefe7f32d..4a536dd72cc 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshot.java
@@ -17,23 +17,20 @@
package org.apache.doris.datasource.paimon;
-import org.apache.doris.catalog.Column;
-import org.apache.doris.datasource.SchemaCacheValue;
+public class PaimonSnapshot {
+ private final long snapshotId;
+ private final long schemaId;
-import org.apache.paimon.table.Table;
-
-import java.util.List;
-
-public class PaimonSchemaCacheValue extends SchemaCacheValue {
-
- private Table paimonTable;
+ public PaimonSnapshot(long snapshotId, long schemaId) {
+ this.snapshotId = snapshotId;
+ this.schemaId = schemaId;
+ }
- public PaimonSchemaCacheValue(List<Column> schema, Table paimonTable) {
- super(schema);
- this.paimonTable = paimonTable;
+ public long getSnapshotId() {
+ return snapshotId;
}
- public Table getPaimonTable() {
- return paimonTable;
+ public long getSchemaId() {
+ return schemaId;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheKey.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheKey.java
new file mode 100644
index 00000000000..970f111a721
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheKey.java
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.paimon;
+
+import org.apache.doris.datasource.CatalogIf;
+
+import java.util.Objects;
+import java.util.StringJoiner;
+
+public class PaimonSnapshotCacheKey {
+ private final CatalogIf catalog;
+ private final String dbName;
+ private final String tableName;
+
+ public PaimonSnapshotCacheKey(CatalogIf catalog, String dbName, String
tableName) {
+ this.catalog = catalog;
+ this.dbName = dbName;
+ this.tableName = tableName;
+ }
+
+ public CatalogIf getCatalog() {
+ return catalog;
+ }
+
+ public String getDbName() {
+ return dbName;
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PaimonSnapshotCacheKey that = (PaimonSnapshotCacheKey) o;
+ return catalog.getId() == that.catalog.getId()
+ && Objects.equals(dbName, that.dbName)
+ && Objects.equals(tableName, that.tableName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(catalog.getId(), dbName, tableName);
+ }
+
+ @Override
+ public String toString() {
+ return new StringJoiner(", ",
PaimonSnapshotCacheKey.class.getSimpleName() + "[", "]")
+ .add("catalog=" + catalog)
+ .add("dbName='" + dbName + "'")
+ .add("tableName='" + tableName + "'")
+ .toString();
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheValue.java
similarity index 64%
copy from
fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java
copy to
fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheValue.java
index aaaefe7f32d..c50ecdabfde 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSnapshotCacheValue.java
@@ -17,23 +17,21 @@
package org.apache.doris.datasource.paimon;
-import org.apache.doris.catalog.Column;
-import org.apache.doris.datasource.SchemaCacheValue;
+public class PaimonSnapshotCacheValue {
-import org.apache.paimon.table.Table;
+ private final PaimonPartitionInfo partitionInfo;
+ private final PaimonSnapshot snapshot;
-import java.util.List;
-
-public class PaimonSchemaCacheValue extends SchemaCacheValue {
-
- private Table paimonTable;
+ public PaimonSnapshotCacheValue(PaimonPartitionInfo partitionInfo,
PaimonSnapshot snapshot) {
+ this.partitionInfo = partitionInfo;
+ this.snapshot = snapshot;
+ }
- public PaimonSchemaCacheValue(List<Column> schema, Table paimonTable) {
- super(schema);
- this.paimonTable = paimonTable;
+ public PaimonPartitionInfo getPartitionInfo() {
+ return partitionInfo;
}
- public Table getPaimonTable() {
- return paimonTable;
+ public PaimonSnapshot getSnapshot() {
+ return snapshot;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java
new file mode 100644
index 00000000000..1f7576dca51
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonUtil.java
@@ -0,0 +1,275 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.paimon;
+
+import org.apache.doris.analysis.PartitionValue;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.ListPartitionItem;
+import org.apache.doris.catalog.PartitionItem;
+import org.apache.doris.catalog.PartitionKey;
+import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.datasource.hive.HiveUtil;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.options.ConfigOption;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.reader.RecordReader;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.JsonSerdeUtil;
+import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.Projection;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+
+public class PaimonUtil {
+ private static final Logger LOG = LogManager.getLogger(PaimonUtil.class);
+
+ public static List<InternalRow> read(
+ Table table, @Nullable int[][] projection, @Nullable Predicate
predicate,
+ Pair<ConfigOption<?>, String>... dynamicOptions)
+ throws IOException {
+ Map<String, String> options = new HashMap<>();
+ for (Pair<ConfigOption<?>, String> pair : dynamicOptions) {
+ options.put(pair.getKey().key(), pair.getValue());
+ }
+ table = table.copy(options);
+ ReadBuilder readBuilder = table.newReadBuilder();
+ if (projection != null) {
+ readBuilder.withProjection(projection);
+ }
+ if (predicate != null) {
+ readBuilder.withFilter(predicate);
+ }
+ RecordReader<InternalRow> reader =
+
readBuilder.newRead().createReader(readBuilder.newScan().plan());
+ InternalRowSerializer serializer =
+ new InternalRowSerializer(
+ projection == null
+ ? table.rowType()
+ :
Projection.of(projection).project(table.rowType()));
+ List<InternalRow> rows = new ArrayList<>();
+ reader.forEachRemaining(row -> rows.add(serializer.copy(row)));
+ return rows;
+ }
+
+
+ /*
+
https://paimon.apache.org/docs/0.9/maintenance/system-tables/#partitions-table
+
+---------------+----------------+--------------------+--------------------+------------------------+
+ | partition | record_count | file_size_in_bytes|
file_count| last_update_time|
+
+---------------+----------------+--------------------+--------------------+------------------------+
+ | [1] | 1 | 645 | 1
| 2024-06-24 10:25:57.400|
+
+---------------+----------------+--------------------+--------------------+------------------------+
+ org.apache.paimon.table.system.PartitionsTable.TABLE_TYPE
+ public static final RowType TABLE_TYPE =
+ new RowType(
+ Arrays.asList(
+ new DataField(0, "partition",
SerializationUtils.newStringType(true)),
+ new DataField(1, "record_count", new
BigIntType(false)),
+ new DataField(2, "file_size_in_bytes", new
BigIntType(false)),
+ new DataField(3, "file_count", new
BigIntType(false)),
+ new DataField(4, "last_update_time",
DataTypes.TIMESTAMP_MILLIS())));
+ */
+ public static PaimonPartition rowToPartition(InternalRow row) {
+ String partition = row.getString(0).toString();
+ long recordCount = row.getLong(1);
+ long fileSizeInBytes = row.getLong(2);
+ long fileCount = row.getLong(3);
+ long lastUpdateTime = row.getTimestamp(4, 3).getMillisecond();
+ return new PaimonPartition(partition, recordCount, fileSizeInBytes,
fileCount, lastUpdateTime);
+ }
+
+ public static PaimonPartitionInfo generatePartitionInfo(List<Column>
partitionColumns,
+ List<PaimonPartition> paimonPartitions) throws AnalysisException {
+ Map<String, PartitionItem> nameToPartitionItem = Maps.newHashMap();
+ Map<String, PaimonPartition> nameToPartition = Maps.newHashMap();
+ PaimonPartitionInfo partitionInfo = new
PaimonPartitionInfo(nameToPartitionItem, nameToPartition);
+ if (CollectionUtils.isEmpty(partitionColumns)) {
+ return partitionInfo;
+ }
+ for (PaimonPartition paimonPartition : paimonPartitions) {
+ String partitionName = getPartitionName(partitionColumns,
paimonPartition.getPartitionValues());
+ nameToPartition.put(partitionName, paimonPartition);
+ nameToPartitionItem.put(partitionName,
toListPartitionItem(partitionName, partitionColumns));
+ }
+ return partitionInfo;
+ }
+
+ private static String getPartitionName(List<Column> partitionColumns,
String partitionValueStr) {
+ Preconditions.checkNotNull(partitionValueStr);
+ String[] partitionValues = partitionValueStr.replace("[",
"").replace("]", "")
+ .split(",");
+ Preconditions.checkState(partitionColumns.size() ==
partitionValues.length);
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < partitionColumns.size(); ++i) {
+ if (i != 0) {
+ sb.append("/");
+ }
+
sb.append(partitionColumns.get(i).getName()).append("=").append(partitionValues[i]);
+ }
+ return sb.toString();
+ }
+
+ public static ListPartitionItem toListPartitionItem(String partitionName,
List<Column> partitionColumns)
+ throws AnalysisException {
+ List<Type> types = partitionColumns.stream()
+ .map(Column::getType)
+ .collect(Collectors.toList());
+ // Partition name will be in format: nation=cn/city=beijing
+ // parse it to get values "cn" and "beijing"
+ List<String> partitionValues =
HiveUtil.toPartitionValues(partitionName);
+ Preconditions.checkState(partitionValues.size() == types.size(),
partitionName + " vs. " + types);
+ List<PartitionValue> values =
Lists.newArrayListWithExpectedSize(types.size());
+ for (String partitionValue : partitionValues) {
+ // null will in partition 'null'
+ // "null" will in partition 'null'
+ // NULL will in partition 'null'
+ // "NULL" will in partition 'NULL'
+ // values.add(new PartitionValue(partitionValue,
"null".equals(partitionValue)));
+ values.add(new PartitionValue(partitionValue, false));
+ }
+ PartitionKey key =
PartitionKey.createListPartitionKeyWithTypes(values, types, true);
+ ListPartitionItem listPartitionItem = new
ListPartitionItem(Lists.newArrayList(key));
+ return listPartitionItem;
+ }
+
+ private static Type
paimonPrimitiveTypeToDorisType(org.apache.paimon.types.DataType dataType) {
+ int tsScale = 3; // default
+ switch (dataType.getTypeRoot()) {
+ case BOOLEAN:
+ return Type.BOOLEAN;
+ case INTEGER:
+ return Type.INT;
+ case BIGINT:
+ return Type.BIGINT;
+ case FLOAT:
+ return Type.FLOAT;
+ case DOUBLE:
+ return Type.DOUBLE;
+ case SMALLINT:
+ return Type.SMALLINT;
+ case TINYINT:
+ return Type.TINYINT;
+ case VARCHAR:
+ case BINARY:
+ case CHAR:
+ case VARBINARY:
+ return Type.STRING;
+ case DECIMAL:
+ DecimalType decimal = (DecimalType) dataType;
+ return ScalarType.createDecimalV3Type(decimal.getPrecision(),
decimal.getScale());
+ case DATE:
+ return ScalarType.createDateV2Type();
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ if (dataType instanceof org.apache.paimon.types.TimestampType)
{
+ tsScale = ((org.apache.paimon.types.TimestampType)
dataType).getPrecision();
+ if (tsScale > 6) {
+ tsScale = 6;
+ }
+ } else if (dataType instanceof
org.apache.paimon.types.LocalZonedTimestampType) {
+ tsScale =
((org.apache.paimon.types.LocalZonedTimestampType) dataType).getPrecision();
+ if (tsScale > 6) {
+ tsScale = 6;
+ }
+ }
+ return ScalarType.createDatetimeV2Type(tsScale);
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ if (dataType instanceof
org.apache.paimon.types.LocalZonedTimestampType) {
+ tsScale =
((org.apache.paimon.types.LocalZonedTimestampType) dataType).getPrecision();
+ if (tsScale > 6) {
+ tsScale = 6;
+ }
+ }
+ return ScalarType.createDatetimeV2Type(tsScale);
+ case ARRAY:
+ ArrayType arrayType = (ArrayType) dataType;
+ Type innerType =
paimonPrimitiveTypeToDorisType(arrayType.getElementType());
+ return org.apache.doris.catalog.ArrayType.create(innerType,
true);
+ case MAP:
+ MapType mapType = (MapType) dataType;
+ return new org.apache.doris.catalog.MapType(
+ paimonTypeToDorisType(mapType.getKeyType()),
paimonTypeToDorisType(mapType.getValueType()));
+ case ROW:
+ RowType rowType = (RowType) dataType;
+ List<DataField> fields = rowType.getFields();
+ return new org.apache.doris.catalog.StructType(fields.stream()
+ .map(field -> new
org.apache.doris.catalog.StructField(field.name(),
+ paimonTypeToDorisType(field.type())))
+ .collect(Collectors.toCollection(ArrayList::new)));
+ case TIME_WITHOUT_TIME_ZONE:
+ return Type.UNSUPPORTED;
+ default:
+ LOG.warn("Cannot transform unknown type: " +
dataType.getTypeRoot());
+ return Type.UNSUPPORTED;
+ }
+ }
+
+ public static Type paimonTypeToDorisType(org.apache.paimon.types.DataType
type) {
+ return paimonPrimitiveTypeToDorisType(type);
+ }
+
+ /**
+ *
https://paimon.apache.org/docs/0.9/maintenance/system-tables/#schemas-table
+ * demo:
+ * 0
+ * [{"id":0,"name":"user_id","type":"BIGINT NOT NULL"},
+ * {"id":1,"name":"item_id","type":"BIGINT"},
+ * {"id":2,"name":"behavior","type":"STRING"},
+ * {"id":3,"name":"dt","type":"STRING NOT NULL"},
+ * {"id":4,"name":"hh","type":"STRING NOT NULL"}]
+ * ["dt"]
+ * ["dt","hh","user_id"]
+ * {"owner":"hadoop","provider":"paimon"}
+ * 2024-12-03 15:38:14.734
+ *
+ * @param row
+ * @return
+ */
+ public static PaimonSchema rowToSchema(InternalRow row) {
+ long schemaId = row.getLong(0);
+ String fieldsStr = row.getString(1).toString();
+ String partitionKeysStr = row.getString(2).toString();
+ List<DataField> fields = JsonSerdeUtil.fromJson(fieldsStr, new
TypeReference<List<DataField>>() {
+ });
+ List<String> partitionKeys = JsonSerdeUtil.fromJson(partitionKeysStr,
new TypeReference<List<String>>() {
+ });
+ return new PaimonSchema(schemaId, fields, partitionKeys);
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java
index 885eba06ed9..a8bb814f1d3 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java
@@ -21,6 +21,7 @@ import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.datasource.mvcc.MvccUtil;
import org.apache.doris.datasource.paimon.PaimonExternalTable;
import org.apache.doris.datasource.property.constants.PaimonProperties;
import org.apache.doris.thrift.TFileAttributes;
@@ -36,7 +37,7 @@ public class PaimonSource {
public PaimonSource(TupleDescriptor desc) {
this.desc = desc;
this.paimonExtTable = (PaimonExternalTable) desc.getTable();
- this.originTable = paimonExtTable.getPaimonTable();
+ this.originTable =
paimonExtTable.getPaimonTable(MvccUtil.getSnapshotFromContext(paimonExtTable));
}
public TupleDescriptor getDesc() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
index a26693cb411..3ab1df4734e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
@@ -29,6 +29,9 @@ import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.MetaLockUtils;
import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.datasource.mvcc.MvccSnapshot;
+import org.apache.doris.datasource.mvcc.MvccTable;
+import org.apache.doris.datasource.mvcc.MvccTableInfo;
import org.apache.doris.job.common.TaskStatus;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.task.AbstractTask;
@@ -69,6 +72,7 @@ import java.math.RoundingMode;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
@@ -140,6 +144,8 @@ public class MTMVTask extends AbstractTask {
private StmtExecutor executor;
private Map<String, MTMVRefreshPartitionSnapshot> partitionSnapshots;
+ private final Map<MvccTableInfo, MvccSnapshot> snapshots =
Maps.newHashMap();
+
public MTMVTask() {
}
@@ -229,6 +235,9 @@ public class MTMVTask extends AbstractTask {
throws Exception {
Objects.requireNonNull(ctx, "ctx should not be null");
StatementContext statementContext = new StatementContext();
+ for (Entry<MvccTableInfo, MvccSnapshot> entry : snapshots.entrySet()) {
+ statementContext.setSnapshot(entry.getKey(), entry.getValue());
+ }
ctx.setStatementContext(statementContext);
TUniqueId queryId = generateQueryId();
lastQueryId = DebugUtil.printId(queryId);
@@ -302,6 +311,11 @@ public class MTMVTask extends AbstractTask {
MTMVBaseTableIf baseTableIf = (MTMVBaseTableIf) tableIf;
baseTableIf.beforeMTMVRefresh(mtmv);
}
+ if (tableIf instanceof MvccTable) {
+ MvccTable mvccTable = (MvccTable) tableIf;
+ MvccSnapshot mvccSnapshot = mvccTable.loadSnapshot();
+ snapshots.put(new MvccTableInfo(mvccTable), mvccSnapshot);
+ }
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java
index da006c392e1..4ca688f7dd9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java
@@ -622,7 +622,11 @@ public class StatementContext implements Closeable {
public void loadSnapshots() {
for (TableIf tableIf : tables.values()) {
if (tableIf instanceof MvccTable) {
- snapshots.put(new MvccTableInfo(tableIf), ((MvccTable)
tableIf).loadSnapshot());
+ MvccTableInfo mvccTableInfo = new MvccTableInfo(tableIf);
+ // may be set by MTMV, we can not load again
+ if (!snapshots.containsKey(mvccTableInfo)) {
+ snapshots.put(mvccTableInfo, ((MvccTable)
tableIf).loadSnapshot());
+ }
}
}
}
@@ -630,11 +634,25 @@ public class StatementContext implements Closeable {
/**
* Obtain snapshot information of mvcc
*
- * @param mvccTable mvccTable
+ * @param tableIf tableIf
* @return MvccSnapshot
*/
- public MvccSnapshot getSnapshot(MvccTable mvccTable) {
- return snapshots.get(new MvccTableInfo(mvccTable));
+ public Optional<MvccSnapshot> getSnapshot(TableIf tableIf) {
+ if (!(tableIf instanceof MvccTable)) {
+ return Optional.empty();
+ }
+ MvccTableInfo mvccTableInfo = new MvccTableInfo(tableIf);
+ return Optional.ofNullable(snapshots.get(mvccTableInfo));
+ }
+
+ /**
+ * Obtain snapshot information of mvcc
+ *
+ * @param mvccTableInfo mvccTableInfo
+ * @param snapshot snapshot
+ */
+ public void setSnapshot(MvccTableInfo mvccTableInfo, MvccSnapshot
snapshot) {
+ snapshots.put(mvccTableInfo, snapshot);
}
private static class CloseableResource implements Closeable {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java
index ba8b270d1f3..e99906f5e13 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneFileScanPartition.java
@@ -36,7 +36,6 @@ import org.apache.commons.collections.CollectionUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -74,8 +73,8 @@ public class PruneFileScanPartition extends
OneRewriteRuleFactory {
private SelectedPartitions pruneExternalPartitions(ExternalTable
externalTable,
LogicalFilter<LogicalFileScan> filter, LogicalFileScan scan,
CascadesContext ctx) {
Map<String, PartitionItem> selectedPartitionItems = Maps.newHashMap();
- // todo: real snapshotId
- if
(CollectionUtils.isEmpty(externalTable.getPartitionColumns(Optional.empty()))) {
+ if (CollectionUtils.isEmpty(externalTable.getPartitionColumns(
+ ctx.getStatementContext().getSnapshot(externalTable)))) {
// non partitioned table, return NOT_PRUNED.
// non partition table will be handled in HiveScanNode.
return SelectedPartitions.NOT_PRUNED;
@@ -83,8 +82,8 @@ public class PruneFileScanPartition extends
OneRewriteRuleFactory {
Map<String, Slot> scanOutput = scan.getOutput()
.stream()
.collect(Collectors.toMap(slot ->
slot.getName().toLowerCase(), Function.identity()));
- // todo: real snapshotId
- List<Slot> partitionSlots =
externalTable.getPartitionColumns(Optional.empty())
+ List<Slot> partitionSlots = externalTable.getPartitionColumns(
+ ctx.getStatementContext().getSnapshot(externalTable))
.stream()
.map(column -> scanOutput.get(column.getName().toLowerCase()))
.collect(Collectors.toList());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java
index 36cc0f95a77..b0a95ffdd3a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java
@@ -28,6 +28,8 @@ import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.ExternalTable;
+import org.apache.doris.datasource.mvcc.MvccUtil;
import org.apache.doris.mtmv.BaseTableInfo;
import org.apache.doris.mtmv.MTMVRelatedTableIf;
import org.apache.doris.nereids.analyzer.UnboundRelation;
@@ -316,6 +318,11 @@ public class UpdateMvByPartitionCommand extends
InsertOverwriteTableCommand {
partitionHasDataItems.add(
((OlapTable)
targetTable).getPartitionInfo().getItem(partition.getId()));
}
+ if (targetTable instanceof ExternalTable) {
+ // Add filter only when partition has data when
external table
+ partitionHasDataItems.add(((ExternalTable)
targetTable).getNameToPartitionItems(
+
MvccUtil.getSnapshotFromContext(targetTable)).get(partitionName));
+ }
}
if (partitionHasDataItems.isEmpty()) {
predicates.setNeedAddFilter(false);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java
index 96b8e032d11..1f5f71f7baf 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalFileScan.java
@@ -20,6 +20,7 @@ package org.apache.doris.nereids.trees.plans.logical;
import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.datasource.ExternalTable;
+import org.apache.doris.datasource.mvcc.MvccUtil;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.trees.TableSample;
@@ -60,10 +61,10 @@ public class LogicalFileScan extends LogicalCatalogRelation
{
}
public LogicalFileScan(RelationId id, ExternalTable table, List<String>
qualifier,
- Optional<TableSample> tableSample,
Optional<TableSnapshot> tableSnapshot) {
- // todo: real snapshotId
+ Optional<TableSample> tableSample, Optional<TableSnapshot>
tableSnapshot) {
this(id, table, qualifier, Optional.empty(), Optional.empty(),
- table.initSelectedPartitions(Optional.empty()), tableSample,
tableSnapshot);
+
table.initSelectedPartitions(MvccUtil.getSnapshotFromContext(table)),
+ tableSample, tableSnapshot);
}
public SelectedPartitions getSelectedPartitions() {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/mtmv/PaimonUtilTest.java
b/fe/fe-core/src/test/java/org/apache/doris/mtmv/PaimonUtilTest.java
new file mode 100644
index 00000000000..789af7bf835
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/mtmv/PaimonUtilTest.java
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.mtmv;
+
+import org.apache.doris.analysis.LiteralExpr;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.PartitionItem;
+import org.apache.doris.catalog.PartitionKey;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.datasource.paimon.PaimonPartition;
+import org.apache.doris.datasource.paimon.PaimonPartitionInfo;
+import org.apache.doris.datasource.paimon.PaimonUtil;
+
+import com.google.common.collect.Lists;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.Timestamp;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+
+public class PaimonUtilTest {
+
+ @Test
+ public void testGeneratePartitionInfo() throws AnalysisException {
+ Column k1 = new Column("k1", PrimitiveType.INT);
+ Column k2 = new Column("k2", PrimitiveType.VARCHAR);
+ List<Column> partitionColumns = Lists.newArrayList(k1, k2);
+ PaimonPartition p1 = new PaimonPartition("[1,aa]", 2, 3, 4, 5);
+ List<PaimonPartition> paimonPartitions = Lists.newArrayList(p1);
+ PaimonPartitionInfo partitionInfo =
PaimonUtil.generatePartitionInfo(partitionColumns, paimonPartitions);
+ String expectPartitionName = "k1=1/k2=aa";
+
Assert.assertTrue(partitionInfo.getNameToPartitionItem().containsKey(expectPartitionName));
+ PartitionItem partitionItem =
partitionInfo.getNameToPartitionItem().get(expectPartitionName);
+ List<PartitionKey> keys = partitionItem.getItems();
+ Assert.assertEquals(1, keys.size());
+ PartitionKey partitionKey = keys.get(0);
+ List<LiteralExpr> exprs = partitionKey.getKeys();
+ Assert.assertEquals(2, exprs.size());
+ Assert.assertEquals(1, exprs.get(0).getLongValue());
+ Assert.assertEquals("aa", exprs.get(1).getStringValue());
+ }
+
+ @Test
+ public void testRowToPartition() {
+ GenericRow row = GenericRow.of(BinaryString.fromString("[1,b]"), 2L,
3L, 4L, Timestamp.fromEpochMillis(5L));
+ PaimonPartition paimonPartition = PaimonUtil.rowToPartition(row);
+ Assert.assertEquals("[1,b]", paimonPartition.getPartitionValues());
+ Assert.assertEquals(2L, paimonPartition.getRecordCount());
+ Assert.assertEquals(3L, paimonPartition.getFileSizeInBytes());
+ Assert.assertEquals(4L, paimonPartition.getFileCount());
+ Assert.assertEquals(5L, paimonPartition.getLastUpdateTime());
+ }
+}
diff --git a/regression-test/data/mtmv_p0/test_paimon_mtmv.out
b/regression-test/data/mtmv_p0/test_paimon_mtmv.out
deleted file mode 100644
index c654cb01214..00000000000
--- a/regression-test/data/mtmv_p0/test_paimon_mtmv.out
+++ /dev/null
@@ -1,9 +0,0 @@
--- This file is automatically generated. You should know what you did if you
want to edit this
--- !catalog --
-1 2 3 4 5 6 7 8 9.1 10.1
11.10 2020-02-02 13str 14varchar a true aaaa
2023-08-13T09:32:38.530
-10 20 30 40 50 60 70 80 90.1 100.1
110.10 2020-03-02 130str 140varchar b false bbbb
2023-08-14T08:32:52.821
-
--- !mtmv --
-1 2 3 4 5 6 7 8 9.1 10.1
11.10 2020-02-02 13str 14varchar a true aaaa
2023-08-13T09:32:38.530
-10 20 30 40 50 60 70 80 90.1 100.1
110.10 2020-03-02 130str 140varchar b false bbbb
2023-08-14T08:32:52.821
-
diff --git a/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy
b/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy
deleted file mode 100644
index e84eb497b2c..00000000000
--- a/regression-test/suites/mtmv_p0/test_paimon_mtmv.groovy
+++ /dev/null
@@ -1,62 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-suite("test_paimon_mtmv",
"p0,external,paimon,external_docker,external_docker_hive") {
- String enabled = context.config.otherConfigs.get("enablePaimonTest")
- logger.info("enabled: " + enabled)
- String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
- logger.info("externalEnvIp: " + externalEnvIp)
- String hdfs_port = context.config.otherConfigs.get("hive2HdfsPort")
- logger.info("hdfs_port: " + hdfs_port)
- if (enabled != null && enabled.equalsIgnoreCase("true")) {
- String catalog_name = "paimon_mtmv_catalog";
- String mvName = "test_paimon_mtmv"
- String dbName = "regression_test_mtmv_p0"
- String paimonDb = "db1"
- String paimonTable = "all_table"
- sql """drop catalog if exists ${catalog_name} """
-
- sql """create catalog if not exists ${catalog_name} properties (
- "type" = "paimon",
- "paimon.catalog.type"="filesystem",
- "warehouse" =
"hdfs://${externalEnvIp}:${hdfs_port}/user/doris/paimon1"
- );"""
-
- order_qt_catalog """select * from
${catalog_name}.${paimonDb}.${paimonTable}"""
- sql """drop materialized view if exists ${mvName};"""
-
- sql """
- CREATE MATERIALIZED VIEW ${mvName}
- BUILD DEFERRED REFRESH AUTO ON MANUAL
- DISTRIBUTED BY RANDOM BUCKETS 2
- PROPERTIES ('replication_num' = '1')
- AS
- SELECT * FROM ${catalog_name}.${paimonDb}.${paimonTable};
- """
-
- sql """
- REFRESH MATERIALIZED VIEW ${mvName} complete
- """
- def jobName = getJobName(dbName, mvName);
- waitingMTMVTaskFinished(jobName)
- order_qt_mtmv "SELECT * FROM ${mvName}"
-
- sql """drop materialized view if exists ${mvName};"""
- sql """ drop catalog if exists ${catalog_name} """
- }
-}
-
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]