This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 53332eb4ba2 [fix](catalog) refactor the schema cache for external
table (#34517) (#34599)
53332eb4ba2 is described below
commit 53332eb4ba23cafcc121fa7f47fbb54c85a10279
Author: Mingyu Chen <[email protected]>
AuthorDate: Thu May 9 18:02:18 2024 +0800
[fix](catalog) refactor the schema cache for external table (#34517)
(#34599)
bp #34517
---
.../apache/doris/datasource/ExternalCatalog.java | 9 ++--
.../apache/doris/datasource/ExternalDatabase.java | 2 +-
.../doris/datasource/ExternalSchemaCache.java | 12 ++---
.../org/apache/doris/datasource/ExternalTable.java | 13 +++--
.../apache/doris/datasource/SchemaCacheValue.java | 40 +++++++++++++++
.../doris/datasource/es/EsExternalTable.java | 8 ++-
.../doris/datasource/hive/HMSExternalTable.java | 31 ++++++------
.../doris/datasource/hive/HMSSchemaCacheValue.java | 43 ++++++++++++++++
.../datasource/iceberg/IcebergExternalTable.java | 6 ++-
.../infoschema/ExternalInfoSchemaTable.java | 9 ++--
.../datasource/infoschema/ExternalMysqlTable.java | 8 +--
.../doris/datasource/jdbc/JdbcExternalTable.java | 7 ++-
.../maxcompute/MaxComputeExternalTable.java | 58 +++++++++++++---------
.../maxcompute/MaxComputeSchemaCacheValue.java | 57 +++++++++++++++++++++
.../doris/datasource/metacache/MetaCache.java | 18 +++++--
.../datasource/paimon/PaimonExternalTable.java | 29 ++++++-----
.../datasource/paimon/PaimonSchemaCacheValue.java | 39 +++++++++++++++
.../datasource/paimon/source/PaimonSource.java | 2 +-
.../doris/datasource/test/TestExternalTable.java | 8 +--
.../org/apache/doris/mysql/AcceptListener.java | 4 +-
20 files changed, 309 insertions(+), 94 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index 72eacbb1de5..435722e06b6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -22,7 +22,6 @@ import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.analysis.DropDbStmt;
import org.apache.doris.analysis.DropTableStmt;
import org.apache.doris.analysis.TableName;
-import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.InfoSchemaDb;
@@ -386,7 +385,7 @@ public abstract class ExternalCatalog
}
}
- public final List<Column> getSchema(String dbName, String tblName) {
+ public final Optional<SchemaCacheValue> getSchema(String dbName, String
tblName) {
makeSureInitialized();
Optional<ExternalDatabase<? extends ExternalTable>> db = getDb(dbName);
if (db.isPresent()) {
@@ -395,9 +394,7 @@ public abstract class ExternalCatalog
return table.get().initSchemaAndUpdateTime();
}
}
- // return one column with unsupported type.
- // not return empty to avoid some unexpected issue.
- return Lists.newArrayList(Column.UNSUPPORTED_COLUMN);
+ return Optional.empty();
}
@Override
@@ -507,7 +504,7 @@ public abstract class ExternalCatalog
}
if (useMetaCache.get()) {
- return metaCache.getMetaObjById(dbId).get();
+ return metaCache.getMetaObjById(dbId).orElse(null);
} else {
return idToDb.get(dbId);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java
index 43c24b5ebd5..6ab3421abc3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalDatabase.java
@@ -370,7 +370,7 @@ public abstract class ExternalDatabase<T extends
ExternalTable>
public T getTableNullable(String tableName) {
makeSureInitialized();
if (extCatalog.getUseMetaCache().get()) {
- return metaCache.getMetaObj(tableName).get();
+ return metaCache.getMetaObj(tableName).orElse(null);
} else {
if (!tableNameToId.containsKey(tableName)) {
return null;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java
index 9d0ddcfad2f..ad1c1306e34 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalSchemaCache.java
@@ -31,8 +31,8 @@ import lombok.Data;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import java.util.List;
import java.util.Objects;
+import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.ExecutorService;
@@ -42,7 +42,7 @@ public class ExternalSchemaCache {
private static final Logger LOG =
LogManager.getLogger(ExternalSchemaCache.class);
private final ExternalCatalog catalog;
- private LoadingCache<SchemaCacheKey, ImmutableList<Column>> schemaCache;
+ private LoadingCache<SchemaCacheKey, Optional<SchemaCacheValue>>
schemaCache;
public ExternalSchemaCache(ExternalCatalog catalog, ExecutorService
executor) {
this.catalog = catalog;
@@ -73,22 +73,22 @@ public class ExternalSchemaCache {
MetricRepo.DORIS_METRIC_REGISTER.addMetrics(schemaCacheGauge);
}
- private ImmutableList<Column> loadSchema(SchemaCacheKey key) {
- ImmutableList<Column> schema =
ImmutableList.copyOf(catalog.getSchema(key.dbName, key.tblName));
+ private Optional<SchemaCacheValue> loadSchema(SchemaCacheKey key) {
+ Optional<SchemaCacheValue> schema = catalog.getSchema(key.dbName,
key.tblName);
if (LOG.isDebugEnabled()) {
LOG.debug("load schema for {} in catalog {}", key,
catalog.getName());
}
return schema;
}
- public List<Column> getSchema(String dbName, String tblName) {
+ public Optional<SchemaCacheValue> getSchemaValue(String dbName, String
tblName) {
SchemaCacheKey key = new SchemaCacheKey(dbName, tblName);
return schemaCache.get(key);
}
public void addSchemaForTest(String dbName, String tblName,
ImmutableList<Column> schema) {
SchemaCacheKey key = new SchemaCacheKey(dbName, tblName);
- schemaCache.put(key, schema);
+ schemaCache.put(key, Optional.of(new SchemaCacheValue(schema)));
}
public void invalidateTableCache(String dbName, String tblName) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java
index 952b5c64cf8..b394b85054a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalTable.java
@@ -143,7 +143,8 @@ public class ExternalTable implements TableIf, Writable,
GsonPostProcessable {
@Override
public List<Column> getFullSchema() {
ExternalSchemaCache cache =
Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(catalog);
- return cache.getSchema(dbName, name);
+ Optional<SchemaCacheValue> schemaCacheValue =
cache.getSchemaValue(dbName, name);
+ return schemaCacheValue.map(SchemaCacheValue::getSchema).orElse(null);
}
@Override
@@ -161,7 +162,6 @@ public class ExternalTable implements TableIf, Writable,
GsonPostProcessable {
return getFullSchema();
}
-
@Override
public void setNewFullSchema(List<Column> newSchema) {
}
@@ -301,12 +301,12 @@ public class ExternalTable implements TableIf, Writable,
GsonPostProcessable {
*
* @return
*/
- public List<Column> initSchemaAndUpdateTime() {
+ public Optional<SchemaCacheValue> initSchemaAndUpdateTime() {
schemaUpdateTime = System.currentTimeMillis();
return initSchema();
}
- public List<Column> initSchema() {
+ public Optional<SchemaCacheValue> initSchema() {
throw new NotImplementedException("implement in sub class");
}
@@ -365,4 +365,9 @@ public class ExternalTable implements TableIf, Writable,
GsonPostProcessable {
public List<Long> getChunkSizes() {
throw new NotImplementedException("getChunkSized not implemented");
}
+
+ protected Optional<SchemaCacheValue> getSchemaCacheValue() {
+ ExternalSchemaCache cache =
Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(catalog);
+ return cache.getSchemaValue(dbName, name);
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/SchemaCacheValue.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/SchemaCacheValue.java
new file mode 100644
index 00000000000..b02b8bda840
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/SchemaCacheValue.java
@@ -0,0 +1,40 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource;
+
+import org.apache.doris.catalog.Column;
+
+import java.util.List;
+
+/**
+ * The cache value of ExternalSchemaCache.
+ * Different external table type has different schema cache value.
+ * For example, Hive table has HMSSchemaCacheValue, Paimon table has
PaimonSchemaCacheValue.
+ * All objects that should be refreshed along with schema should be put in
this class.
+ */
+public class SchemaCacheValue {
+ protected List<Column> schema;
+
+ public SchemaCacheValue(List<Column> schema) {
+ this.schema = schema;
+ }
+
+ public List<Column> getSchema() {
+ return schema;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/es/EsExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/es/EsExternalTable.java
index 6399f89da55..cfde5e794a3 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/es/EsExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/es/EsExternalTable.java
@@ -20,11 +20,13 @@ package org.apache.doris.datasource.es;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.EsTable;
import org.apache.doris.datasource.ExternalTable;
+import org.apache.doris.datasource.SchemaCacheValue;
import org.apache.doris.thrift.TEsTable;
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;
import java.util.List;
+import java.util.Optional;
/**
* Elasticsearch external table.
@@ -69,9 +71,11 @@ public class EsExternalTable extends ExternalTable {
}
@Override
- public List<Column> initSchema() {
+ public Optional<SchemaCacheValue> initSchema() {
EsRestClient restClient = ((EsExternalCatalog)
catalog).getEsRestClient();
- return EsUtil.genColumnsFromEs(restClient, name, null,
((EsExternalCatalog) catalog).enableMappingEsId());
+ return Optional.of(new SchemaCacheValue(
+ EsUtil.genColumnsFromEs(restClient, name, null,
+ ((EsExternalCatalog) catalog).enableMappingEsId())));
}
private EsTable toEsTable() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
index e29bafd5dc7..c2099a1acc8 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java
@@ -29,6 +29,7 @@ import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.datasource.ExternalTable;
+import org.apache.doris.datasource.SchemaCacheValue;
import org.apache.doris.datasource.hudi.HudiUtils;
import org.apache.doris.datasource.hudi.source.COWIncrementalRelation;
import org.apache.doris.datasource.hudi.source.IncrementalRelation;
@@ -159,7 +160,6 @@ public class HMSExternalTable extends ExternalTable
implements MTMVRelatedTableI
}
private volatile org.apache.hadoop.hive.metastore.api.Table remoteTable =
null;
- private List<Column> partitionColumns;
private DLAType dlaType = DLAType.UNKNOWN;
@@ -296,15 +296,17 @@ public class HMSExternalTable extends ExternalTable
implements MTMVRelatedTableI
public List<Type> getPartitionColumnTypes() {
makeSureInitialized();
- getFullSchema();
- return partitionColumns.stream().map(c ->
c.getType()).collect(Collectors.toList());
+ Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
+ return schemaCacheValue.map(value -> ((HMSSchemaCacheValue)
value).getPartitionColTypes())
+ .orElse(Collections.emptyList());
}
@Override
public List<Column> getPartitionColumns() {
makeSureInitialized();
- getFullSchema();
- return partitionColumns;
+ Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
+ return schemaCacheValue.map(value -> ((HMSSchemaCacheValue)
value).getPartitionColumns())
+ .orElse(Collections.emptyList());
}
public TableScanParams getScanParams() {
@@ -532,7 +534,7 @@ public class HMSExternalTable extends ExternalTable
implements MTMVRelatedTableI
}
@Override
- public List<Column> initSchemaAndUpdateTime() {
+ public Optional<SchemaCacheValue> initSchemaAndUpdateTime() {
org.apache.hadoop.hive.metastore.api.Table table =
((HMSExternalCatalog) catalog).getClient()
.getTable(dbName, name);
// try to use transient_lastDdlTime from hms client
@@ -554,7 +556,7 @@ public class HMSExternalTable extends ExternalTable
implements MTMVRelatedTableI
}
@Override
- public List<Column> initSchema() {
+ public Optional<SchemaCacheValue> initSchema() {
makeSureInitialized();
List<Column> columns;
if (dlaType.equals(DLAType.ICEBERG)) {
@@ -564,8 +566,8 @@ public class HMSExternalTable extends ExternalTable
implements MTMVRelatedTableI
} else {
columns = getHiveSchema();
}
- initPartitionColumns(columns);
- return columns;
+ List<Column> partitionColumns = initPartitionColumns(columns);
+ return Optional.of(new HMSSchemaCacheValue(columns, partitionColumns));
}
private List<Column> getIcebergSchema() {
@@ -585,18 +587,16 @@ public class HMSExternalTable extends ExternalTable
implements MTMVRelatedTableI
private List<Column> getHiveSchema() {
HMSCachedClient client = ((HMSExternalCatalog) catalog).getClient();
- List<Column> columns;
List<FieldSchema> schema = client.getSchema(dbName, name);
Map<String, String> colDefaultValues =
client.getDefaultColumnValues(dbName, name);
- List<Column> tmpSchema = Lists.newArrayListWithCapacity(schema.size());
+ List<Column> columns = Lists.newArrayListWithCapacity(schema.size());
for (FieldSchema field : schema) {
String fieldName = field.getName().toLowerCase(Locale.ROOT);
String defaultValue = colDefaultValues.getOrDefault(fieldName,
null);
- tmpSchema.add(new Column(fieldName,
+ columns.add(new Column(fieldName,
HiveMetaStoreClientHelper.hiveTypeToDorisType(field.getType()), true, null,
true, defaultValue, field.getComment(), true, -1));
}
- columns = tmpSchema;
return columns;
}
@@ -613,10 +613,10 @@ public class HMSExternalTable extends ExternalTable
implements MTMVRelatedTableI
return rowCount;
}
- private void initPartitionColumns(List<Column> schema) {
+ private List<Column> initPartitionColumns(List<Column> schema) {
List<String> partitionKeys =
remoteTable.getPartitionKeys().stream().map(FieldSchema::getName)
.collect(Collectors.toList());
- partitionColumns =
Lists.newArrayListWithCapacity(partitionKeys.size());
+ List<Column> partitionColumns =
Lists.newArrayListWithCapacity(partitionKeys.size());
for (String partitionKey : partitionKeys) {
// Do not use "getColumn()", which will cause dead loop
for (Column column : schema) {
@@ -636,6 +636,7 @@ public class HMSExternalTable extends ExternalTable
implements MTMVRelatedTableI
if (LOG.isDebugEnabled()) {
LOG.debug("get {} partition columns for table: {}",
partitionColumns.size(), name);
}
+ return partitionColumns;
}
public boolean hasColumnStatistics(String colName) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSSchemaCacheValue.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSSchemaCacheValue.java
new file mode 100644
index 00000000000..79631e90db0
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSSchemaCacheValue.java
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.hive;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.datasource.SchemaCacheValue;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class HMSSchemaCacheValue extends SchemaCacheValue {
+
+ private List<Column> partitionColumns;
+
+ public HMSSchemaCacheValue(List<Column> schema, List<Column>
partitionColumns) {
+ super(schema);
+ this.partitionColumns = partitionColumns;
+ }
+
+ public List<Column> getPartitionColumns() {
+ return partitionColumns;
+ }
+
+ public List<Type> getPartitionColTypes() {
+ return
partitionColumns.stream().map(Column::getType).collect(Collectors.toList());
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java
index 5266d8745de..de9c3814fd6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalTable.java
@@ -19,6 +19,7 @@ package org.apache.doris.datasource.iceberg;
import org.apache.doris.catalog.Column;
import org.apache.doris.datasource.ExternalTable;
+import org.apache.doris.datasource.SchemaCacheValue;
import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.statistics.BaseAnalysisTask;
import org.apache.doris.statistics.ExternalAnalysisTask;
@@ -29,6 +30,7 @@ import org.apache.doris.thrift.TTableType;
import java.util.HashMap;
import java.util.List;
+import java.util.Optional;
public class IcebergExternalTable extends ExternalTable {
@@ -48,8 +50,8 @@ public class IcebergExternalTable extends ExternalTable {
}
@Override
- public List<Column> initSchema() {
- return IcebergUtils.getSchema(catalog, dbName, name);
+ public Optional<SchemaCacheValue> initSchema() {
+ return Optional.of(new
SchemaCacheValue(IcebergUtils.getSchema(catalog, dbName, name)));
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/infoschema/ExternalInfoSchemaTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/infoschema/ExternalInfoSchemaTable.java
index 6faf965752b..9d133639612 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/infoschema/ExternalInfoSchemaTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/infoschema/ExternalInfoSchemaTable.java
@@ -18,16 +18,16 @@
package org.apache.doris.datasource.infoschema;
import org.apache.doris.analysis.SchemaTableType;
-import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.InfoSchemaDb;
import org.apache.doris.catalog.SchemaTable;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.ExternalTable;
+import org.apache.doris.datasource.SchemaCacheValue;
import org.apache.doris.thrift.TSchemaTable;
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;
-import java.util.List;
+import java.util.Optional;
public class ExternalInfoSchemaTable extends ExternalTable {
@@ -36,10 +36,9 @@ public class ExternalInfoSchemaTable extends ExternalTable {
}
@Override
- public List<Column> initSchema() {
+ public Optional<SchemaCacheValue> initSchema() {
makeSureInitialized();
- List<Column> columns = SchemaTable.TABLE_MAP.get(name).getFullSchema();
- return columns;
+ return Optional.of(new
SchemaCacheValue(SchemaTable.TABLE_MAP.get(name).getFullSchema()));
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/infoschema/ExternalMysqlTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/infoschema/ExternalMysqlTable.java
index fc64bc053a4..6f277a56906 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/infoschema/ExternalMysqlTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/infoschema/ExternalMysqlTable.java
@@ -18,16 +18,16 @@
package org.apache.doris.datasource.infoschema;
import org.apache.doris.analysis.SchemaTableType;
-import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.MysqlDBTable;
import org.apache.doris.catalog.MysqlDb;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.ExternalTable;
+import org.apache.doris.datasource.SchemaCacheValue;
import org.apache.doris.thrift.TSchemaTable;
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;
-import java.util.List;
+import java.util.Optional;
public class ExternalMysqlTable extends ExternalTable {
public ExternalMysqlTable(long id, String name, ExternalCatalog catalog) {
@@ -35,9 +35,9 @@ public class ExternalMysqlTable extends ExternalTable {
}
@Override
- public List<Column> initSchema() {
+ public Optional<SchemaCacheValue> initSchema() {
makeSureInitialized();
- return MysqlDBTable.TABLE_MAP.get(name).getFullSchema();
+ return Optional.of(new
SchemaCacheValue(MysqlDBTable.TABLE_MAP.get(name).getFullSchema()));
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalTable.java
index 64fd25525e5..242b973b87e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalTable.java
@@ -20,6 +20,7 @@ package org.apache.doris.datasource.jdbc;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.JdbcTable;
import org.apache.doris.datasource.ExternalTable;
+import org.apache.doris.datasource.SchemaCacheValue;
import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.statistics.BaseAnalysisTask;
import org.apache.doris.statistics.JdbcAnalysisTask;
@@ -29,6 +30,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
+import java.util.Optional;
/**
* Elasticsearch external table.
@@ -71,8 +73,9 @@ public class JdbcExternalTable extends ExternalTable {
}
@Override
- public List<Column> initSchema() {
- return ((JdbcExternalCatalog)
catalog).getJdbcClient().getColumnsFromJdbc(dbName, name);
+ public Optional<SchemaCacheValue> initSchema() {
+ return Optional.of(new SchemaCacheValue(((JdbcExternalCatalog)
catalog).getJdbcClient()
+ .getColumnsFromJdbc(dbName, name)));
}
private JdbcTable toJdbcTable() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java
index 363b7ce689d..297a4c0fa09 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeExternalTable.java
@@ -26,6 +26,7 @@ import org.apache.doris.catalog.StructField;
import org.apache.doris.catalog.StructType;
import org.apache.doris.catalog.Type;
import org.apache.doris.datasource.ExternalTable;
+import org.apache.doris.datasource.SchemaCacheValue;
import org.apache.doris.datasource.TablePartitionValues;
import org.apache.doris.thrift.TMCTable;
import org.apache.doris.thrift.TTableDescriptor;
@@ -43,11 +44,13 @@ import com.aliyun.odps.type.TypeInfo;
import com.aliyun.odps.type.VarcharTypeInfo;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import java.util.ArrayList;
-import java.util.LinkedHashMap;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@@ -55,12 +58,6 @@ import java.util.stream.Collectors;
* MaxCompute external table.
*/
public class MaxComputeExternalTable extends ExternalTable {
-
- private Table odpsTable;
- private List<String> partitionSpecs;
- private Map<String, Column> partitionNameToColumns;
- private List<Type> partitionTypes;
-
public MaxComputeExternalTable(long id, String name, String dbName,
MaxComputeExternalCatalog catalog) {
super(id, name, catalog, dbName, TableType.MAX_COMPUTE_EXTERNAL_TABLE);
}
@@ -69,8 +66,6 @@ public class MaxComputeExternalTable extends ExternalTable {
protected synchronized void makeSureInitialized() {
super.makeSureInitialized();
if (!objectCreated) {
- odpsTable = ((MaxComputeExternalCatalog)
catalog).getClient().tables().get(name);
- initTablePartitions();
objectCreated = true;
}
}
@@ -100,26 +95,37 @@ public class MaxComputeExternalTable extends ExternalTable
{
@Override
public Set<String> getPartitionNames() {
makeSureInitialized();
- return partitionNameToColumns.keySet();
+ Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
+ return schemaCacheValue.map(value -> ((MaxComputeSchemaCacheValue)
value).getPartitionColNames())
+ .orElse(Collections.emptySet());
}
public List<Column> getPartitionColumns() {
makeSureInitialized();
- return new ArrayList<>(partitionNameToColumns.values());
+ Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
+ return schemaCacheValue.map(value -> ((MaxComputeSchemaCacheValue)
value).getPartitionColumns())
+ .orElse(Collections.emptyList());
}
public TablePartitionValues getPartitionValues() {
makeSureInitialized();
- // Make sure to call it after initSchema() completes
+ Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
+ if (!schemaCacheValue.isPresent()) {
+ return new TablePartitionValues();
+ }
+ Table odpsTable = ((MaxComputeSchemaCacheValue)
schemaCacheValue.get()).getOdpsTable();
String projectName = odpsTable.getProject();
String tableName = odpsTable.getName();
MaxComputeMetadataCache metadataCache =
Env.getCurrentEnv().getExtMetaCacheMgr()
.getMaxComputeMetadataCache(catalog.getId());
return metadataCache.getCachedPartitionValues(
- new MaxComputeCacheKey(projectName, tableName), key ->
loadPartitionValues(key));
+ new MaxComputeCacheKey(projectName, tableName),
+ key -> loadPartitionValues((MaxComputeSchemaCacheValue)
schemaCacheValue.get()));
}
- private TablePartitionValues loadPartitionValues(MaxComputeCacheKey key) {
+ private TablePartitionValues
loadPartitionValues(MaxComputeSchemaCacheValue schemaCacheValue) {
+ List<String> partitionSpecs = schemaCacheValue.getPartitionSpecs();
+ List<Type> partitionTypes = schemaCacheValue.getPartitionTypes();
TablePartitionValues partitionValues = new TablePartitionValues();
partitionValues.addPartitions(partitionSpecs,
partitionSpecs.stream()
@@ -154,21 +160,19 @@ public class MaxComputeExternalTable extends
ExternalTable {
}
@Override
- public List<Column> initSchema() {
+ public Optional<SchemaCacheValue> initSchema() {
// this method will be called at semantic parsing.
makeSureInitialized();
+ Table odpsTable = ((MaxComputeExternalCatalog)
catalog).getClient().tables().get(name);
List<com.aliyun.odps.Column> columns =
odpsTable.getSchema().getColumns();
- List<Column> result = Lists.newArrayListWithCapacity(columns.size());
+ List<Column> schema = Lists.newArrayListWithCapacity(columns.size());
for (com.aliyun.odps.Column field : columns) {
- result.add(new Column(field.getName(),
mcTypeToDorisType(field.getTypeInfo()), true, null,
+ schema.add(new Column(field.getName(),
mcTypeToDorisType(field.getTypeInfo()), true, null,
true, field.getComment(), true, -1));
}
- result.addAll(partitionNameToColumns.values());
- return result;
- }
- private void initTablePartitions() {
List<com.aliyun.odps.Column> partitionColumns =
odpsTable.getSchema().getPartitionColumns();
+ List<String> partitionSpecs;
if (!partitionColumns.isEmpty()) {
partitionSpecs = odpsTable.getPartitions().stream()
.map(e -> e.getPartitionSpec().toString(false, true))
@@ -177,17 +181,21 @@ public class MaxComputeExternalTable extends
ExternalTable {
partitionSpecs = ImmutableList.of();
}
// sort partition columns to align partitionTypes and partitionName.
- partitionNameToColumns = new LinkedHashMap<>();
+ Map<String, Column> partitionNameToColumns = Maps.newHashMap();
for (com.aliyun.odps.Column partColumn : partitionColumns) {
Column dorisCol = new Column(partColumn.getName(),
mcTypeToDorisType(partColumn.getTypeInfo()), true, null,
true, partColumn.getComment(), true, -1);
partitionNameToColumns.put(dorisCol.getName(), dorisCol);
}
- partitionTypes = partitionNameToColumns.values()
+ List<Type> partitionTypes = partitionNameToColumns.values()
.stream()
.map(Column::getType)
.collect(Collectors.toList());
+
+ schema.addAll(partitionNameToColumns.values());
+ return Optional.of(new MaxComputeSchemaCacheValue(schema, odpsTable,
partitionSpecs, partitionNameToColumns,
+ partitionTypes));
}
private Type mcTypeToDorisType(TypeInfo typeInfo) {
@@ -295,6 +303,8 @@ public class MaxComputeExternalTable extends ExternalTable {
public Table getOdpsTable() {
makeSureInitialized();
- return odpsTable;
+ Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
+ return schemaCacheValue.map(value -> ((MaxComputeSchemaCacheValue)
value).getOdpsTable())
+ .orElse(null);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeSchemaCacheValue.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeSchemaCacheValue.java
new file mode 100644
index 00000000000..b8337d96120
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/MaxComputeSchemaCacheValue.java
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.maxcompute;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.datasource.SchemaCacheValue;
+
+import com.aliyun.odps.Table;
+import com.google.common.collect.Lists;
+import lombok.Getter;
+import lombok.Setter;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@Getter
+@Setter
+public class MaxComputeSchemaCacheValue extends SchemaCacheValue {
+ private Table odpsTable;
+ private List<String> partitionSpecs;
+ private Map<String, Column> partitionNameToColumns;
+ private List<Type> partitionTypes;
+
+ public MaxComputeSchemaCacheValue(List<Column> schema, Table odpsTable,
List<String> partitionSpecs,
+ Map<String, Column> partitionNameToColumns, List<Type>
partitionTypes) {
+ super(schema);
+ this.odpsTable = odpsTable;
+ this.partitionSpecs = partitionSpecs;
+ this.partitionNameToColumns = partitionNameToColumns;
+ this.partitionTypes = partitionTypes;
+ }
+
+ public Set<String> getPartitionColNames() {
+ return partitionNameToColumns.keySet();
+ }
+
+ public List<Column> getPartitionColumns() {
+ return Lists.newArrayList(partitionNameToColumns.values());
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/metacache/MetaCache.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/metacache/MetaCache.java
index 023396670d8..da8f068dfd4 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/metacache/MetaCache.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/metacache/MetaCache.java
@@ -49,14 +49,26 @@ public class MetaCache<T> {
RemovalListener<String, Optional<T>> removalListener) {
this.name = name;
- CacheFactory cacheFactory = new CacheFactory(
+ // ATTN:
+ // The refreshAfterWriteSec is only used for metaObjCache, not for
namesCache.
+ // Because namesCache need to be refreshed at interval so that user
can get the latest meta list.
+ // But metaObjCache does not need to be refreshed at interval, because
the object is actually not
+ // from remote datasource, it is just a local generated object to
represent the meta info.
+ // So it only need to be expired after specified duration.
+ CacheFactory namesCacheFactory = new CacheFactory(
expireAfterWriteSec,
refreshAfterWriteSec,
maxSize,
true,
null);
- namesCache = cacheFactory.buildCache(namesCacheLoader, null, executor);
- metaObjCache = cacheFactory.buildCache(metaObjCacheLoader,
removalListener, executor);
+ CacheFactory objCacheFactory = new CacheFactory(
+ expireAfterWriteSec,
+ OptionalLong.empty(),
+ maxSize,
+ true,
+ null);
+ namesCache = namesCacheFactory.buildCache(namesCacheLoader, null,
executor);
+ metaObjCache = objCacheFactory.buildCache(metaObjCacheLoader,
removalListener, executor);
}
public List<String> listNames() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java
index 7d870f36059..d9e43bdd6cf 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalTable.java
@@ -21,6 +21,7 @@ import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Type;
import org.apache.doris.datasource.ExternalTable;
+import org.apache.doris.datasource.SchemaCacheValue;
import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.statistics.BaseAnalysisTask;
import org.apache.doris.statistics.ExternalAnalysisTask;
@@ -44,14 +45,13 @@ import org.apache.paimon.types.RowType;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
+import java.util.Optional;
import java.util.stream.Collectors;
public class PaimonExternalTable extends ExternalTable {
private static final Logger LOG =
LogManager.getLogger(PaimonExternalTable.class);
- private Table originTable = null;
-
public PaimonExternalTable(long id, String name, String dbName,
PaimonExternalCatalog catalog) {
super(id, name, catalog, dbName, TableType.PAIMON_EXTERNAL_TABLE);
}
@@ -63,23 +63,20 @@ public class PaimonExternalTable extends ExternalTable {
protected synchronized void makeSureInitialized() {
super.makeSureInitialized();
if (!objectCreated) {
- originTable = ((PaimonExternalCatalog)
catalog).getPaimonTable(dbName, name);
- schemaUpdateTime = System.currentTimeMillis();
objectCreated = true;
}
}
- public Table getOriginTable() {
+ public Table getPaimonTable() {
makeSureInitialized();
- return originTable;
+ Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
+ return schemaCacheValue.map(value -> ((PaimonSchemaCacheValue)
value).getPaimonTable()).orElse(null);
}
@Override
- public List<Column> initSchema() {
- //init schema need update lastUpdateTime and get latest schema
- objectCreated = false;
- Table table = getOriginTable();
- TableSchema schema = ((FileStoreTable) table).schema();
+ public Optional<SchemaCacheValue> initSchema() {
+ Table paimonTable = ((PaimonExternalCatalog)
catalog).getPaimonTable(dbName, name);
+ TableSchema schema = ((FileStoreTable) paimonTable).schema();
List<DataField> columns = schema.fields();
List<Column> tmpSchema =
Lists.newArrayListWithCapacity(columns.size());
for (DataField field : columns) {
@@ -87,7 +84,7 @@ public class PaimonExternalTable extends ExternalTable {
paimonTypeToDorisType(field.type()), true, null, true,
field.description(), true,
field.id()));
}
- return tmpSchema;
+ return Optional.of(new PaimonSchemaCacheValue(tmpSchema, paimonTable));
}
private Type
paimonPrimitiveTypeToDorisType(org.apache.paimon.types.DataType dataType) {
@@ -180,7 +177,13 @@ public class PaimonExternalTable extends ExternalTable {
makeSureInitialized();
try {
long rowCount = 0;
- List<Split> splits =
originTable.newReadBuilder().newScan().plan().splits();
+ Optional<SchemaCacheValue> schemaCacheValue =
getSchemaCacheValue();
+ Table paimonTable = schemaCacheValue.map(value ->
((PaimonSchemaCacheValue) value).getPaimonTable())
+ .orElse(null);
+ if (paimonTable == null) {
+ return -1;
+ }
+ List<Split> splits =
paimonTable.newReadBuilder().newScan().plan().splits();
for (Split split : splits) {
rowCount += split.rowCount();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java
new file mode 100644
index 00000000000..aaaefe7f32d
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonSchemaCacheValue.java
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.paimon;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.datasource.SchemaCacheValue;
+
+import org.apache.paimon.table.Table;
+
+import java.util.List;
+
+public class PaimonSchemaCacheValue extends SchemaCacheValue {
+
+ private Table paimonTable;
+
+ public PaimonSchemaCacheValue(List<Column> schema, Table paimonTable) {
+ super(schema);
+ this.paimonTable = paimonTable;
+ }
+
+ public Table getPaimonTable() {
+ return paimonTable;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java
index a405aa92ea6..9ac44537e8a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonSource.java
@@ -39,7 +39,7 @@ public class PaimonSource {
public PaimonSource(PaimonExternalTable table, TupleDescriptor desc,
Map<String, ColumnRange> columnNameToRange) {
this.paimonExtTable = table;
- this.originTable = paimonExtTable.getOriginTable();
+ this.originTable = paimonExtTable.getPaimonTable();
this.desc = desc;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/test/TestExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/test/TestExternalTable.java
index 99f0238c170..6da0981b97e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/test/TestExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/test/TestExternalTable.java
@@ -17,15 +17,15 @@
package org.apache.doris.datasource.test;
-import org.apache.doris.catalog.Column;
import org.apache.doris.datasource.ExternalTable;
+import org.apache.doris.datasource.SchemaCacheValue;
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import java.util.List;
+import java.util.Optional;
/**
* TestExternalTable is a table for unit test.
@@ -53,7 +53,7 @@ public class TestExternalTable extends ExternalTable {
}
@Override
- public List<Column> initSchema() {
- return ((TestExternalCatalog) catalog).mockedSchema(dbName, name);
+ public Optional<SchemaCacheValue> initSchema() {
+ return Optional.of(new SchemaCacheValue(((TestExternalCatalog)
catalog).mockedSchema(dbName, name)));
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java
b/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java
index c3cdf8a955a..3d783f28cb3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java
@@ -101,8 +101,8 @@ public class AcceptListener implements
ChannelListener<AcceptingChannel<StreamCo
}
context.setStartTime();
int userQueryTimeout =
context.getEnv().getAuth().getQueryTimeout(context.getQualifiedUser());
- if (userQueryTimeout <= 0) {
- LOG.warn("Connection set query timeout to {}",
+ if (userQueryTimeout <= 0 && LOG.isDebugEnabled()) {
+ LOG.debug("Connection set query timeout to {}",
context.getSessionVariable().getQueryTimeoutS());
}
context.setUserQueryTimeout(userQueryTimeout);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]