This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new e2e3dfef7cb [fix](multi-catalog) add max compute custom odps and
tunnel url #31390 (#31925)
e2e3dfef7cb is described below
commit e2e3dfef7cb812922d10218d8a1804927aef9b7d
Author: slothever <[email protected]>
AuthorDate: Thu Mar 7 21:41:30 2024 +0800
[fix](multi-catalog) add max compute custom odps and tunnel url #31390
(#31925)
---
be/src/runtime/descriptors.cpp | 2 ++
be/src/runtime/descriptors.h | 4 +++
.../exec/format/table/max_compute_jni_reader.cpp | 2 ++
.../en/docs/lakehouse/multi-catalog/max-compute.md | 25 +++++++++++++
.../docs/lakehouse/multi-catalog/max-compute.md | 26 ++++++++++++++
.../doris/maxcompute/MaxComputeJniScanner.java | 5 ++-
.../doris/maxcompute/MaxComputeTableScan.java | 37 ++++++++++++++-----
.../catalog/external/MaxComputeExternalTable.java | 8 ++---
.../datasource/MaxComputeExternalCatalog.java | 41 +++++++++++++++++-----
.../property/constants/MCProperties.java | 2 ++
gensrc/thrift/Descriptors.thrift | 2 ++
11 files changed, 131 insertions(+), 23 deletions(-)
diff --git a/be/src/runtime/descriptors.cpp b/be/src/runtime/descriptors.cpp
index a73f05f326b..2c532dd1d58 100644
--- a/be/src/runtime/descriptors.cpp
+++ b/be/src/runtime/descriptors.cpp
@@ -190,6 +190,8 @@ MaxComputeTableDescriptor::MaxComputeTableDescriptor(const
TTableDescriptor& tde
_region(tdesc.mcTable.region),
_project(tdesc.mcTable.project),
_table(tdesc.mcTable.table),
+ _odps_url(tdesc.mcTable.odps_url),
+ _tunnel_url(tdesc.mcTable.tunnel_url),
_access_key(tdesc.mcTable.access_key),
_secret_key(tdesc.mcTable.secret_key),
_public_access(tdesc.mcTable.public_access) {}
diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h
index 015e0e60607..36196461b30 100644
--- a/be/src/runtime/descriptors.h
+++ b/be/src/runtime/descriptors.h
@@ -235,6 +235,8 @@ public:
const std::string region() const { return _region; }
const std::string project() const { return _project; }
const std::string table() const { return _table; }
+ const std::string odps_url() const { return _odps_url; }
+ const std::string tunnel_url() const { return _tunnel_url; }
const std::string access_key() const { return _access_key; }
const std::string secret_key() const { return _secret_key; }
const std::string public_access() const { return _public_access; }
@@ -243,6 +245,8 @@ private:
std::string _region;
std::string _project;
std::string _table;
+ std::string _odps_url;
+ std::string _tunnel_url;
std::string _access_key;
std::string _secret_key;
std::string _public_access;
diff --git a/be/src/vec/exec/format/table/max_compute_jni_reader.cpp
b/be/src/vec/exec/format/table/max_compute_jni_reader.cpp
index 7ba714eedd5..cda5c716e72 100644
--- a/be/src/vec/exec/format/table/max_compute_jni_reader.cpp
+++ b/be/src/vec/exec/format/table/max_compute_jni_reader.cpp
@@ -66,6 +66,8 @@ MaxComputeJniReader::MaxComputeJniReader(const
MaxComputeTableDescriptor* mc_des
index++;
}
std::map<String, String> params = {{"region", _table_desc->region()},
+ {"odps_url", _table_desc->odps_url()},
+ {"tunnel_url",
_table_desc->tunnel_url()},
{"access_key",
_table_desc->access_key()},
{"secret_key",
_table_desc->secret_key()},
{"project", _table_desc->project()},
diff --git a/docs/en/docs/lakehouse/multi-catalog/max-compute.md
b/docs/en/docs/lakehouse/multi-catalog/max-compute.md
index a2f141df70c..3001b480b34 100644
--- a/docs/en/docs/lakehouse/multi-catalog/max-compute.md
+++ b/docs/en/docs/lakehouse/multi-catalog/max-compute.md
@@ -57,4 +57,29 @@ Pay-as-you-go quota has limited concurrency and usage. For
additional resources,
Consistent with Hive Catalog, please refer to the **column type mapping**
section in [Hive Catalog](./hive.md).
+## User-defined service address
+
+The region property is specified to generate a default endpoint of public
network.
+
+In addition to default endpoint addresses, Max Compute Catalog also supports
custom service addresses in properties.
+
+Use the following properties:
+* `mc.odps_endpoint`:Max Compute Endpoint。
+* `mc.tunnel_endpoint`: Tunnel Endpoint,Max Compute Catalog uses the Tunnel
SDK to obtain data.
+
+For more information about Max Compute Endpoint and Tunnel Endpoint that are
used in different regions and network connection modes, see
[Endpoint](https://www.alibabacloud.com/help/en/maxcompute/user-guide/endpoints)
+
+For example:
+
+```sql
+CREATE CATALOG mc PROPERTIES (
+ "type" = "max_compute",
+ "mc.region" = "cn-beijing",
+ "mc.default.project" = "your-project",
+ "mc.access_key" = "ak",
+ "mc.secret_key" = "sk"
+ "mc.odps_endpoint" =
"http://service.cn-beijing.maxcompute.aliyun-inc.com/api",
+ "mc.tunnel_endpoint" = "http://dt.cn-beijing.maxcompute.aliyun-inc.com"
+);
+```
diff --git a/docs/zh-CN/docs/lakehouse/multi-catalog/max-compute.md
b/docs/zh-CN/docs/lakehouse/multi-catalog/max-compute.md
index e4b6eacb5f4..8d4f3ab2ead 100644
--- a/docs/zh-CN/docs/lakehouse/multi-catalog/max-compute.md
+++ b/docs/zh-CN/docs/lakehouse/multi-catalog/max-compute.md
@@ -57,4 +57,30 @@ CREATE CATALOG mc PROPERTIES (
和 Hive Catalog 一致,可参阅 [Hive Catalog](./hive.md) 中 **列类型映射** 一节。
+## 自定义服务地址
+
+默认情况下,Max Compute Catalog根据region去默认生成公网的endpoint。
+
+除了默认的endpoint地址外,Max Compute Catalog也支持在属性中自定义服务地址。
+
+使用以下两个属性:
+* `mc.odps_endpoint`:Max Compute Endpoint。
+* `mc.tunnel_endpoint`: Tunnel Endpoint,Max Compute Catalog使用Tunnel SDK获取数据。
+
+Max Compute Endpoint和Tunnel
Endpoint的配置请参见[各地域及不同网络连接方式下的Endpoint](https://help.aliyun.com/zh/maxcompute/user-guide/endpoints)
+
+示例:
+
+```sql
+CREATE CATALOG mc PROPERTIES (
+ "type" = "max_compute",
+ "mc.region" = "cn-beijing",
+ "mc.default.project" = "your-project",
+ "mc.access_key" = "ak",
+ "mc.secret_key" = "sk"
+ "mc.odps_endpoint" =
"http://service.cn-beijing.maxcompute.aliyun-inc.com/api",
+ "mc.tunnel_endpoint" = "http://dt.cn-beijing.maxcompute.aliyun-inc.com"
+);
+```
+
diff --git
a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java
b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java
index f4a8a9c8fc6..95bf748658b 100644
---
a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java
+++
b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeJniScanner.java
@@ -58,6 +58,8 @@ public class MaxComputeJniScanner extends JniScanner {
private static final String TABLE = "table";
private static final String ACCESS_KEY = "access_key";
private static final String SECRET_KEY = "secret_key";
+ private static final String ODPS_URL = "odps_url";
+ private static final String TUNNEL_URL = "tunnel_url";
private static final String START_OFFSET = "start_offset";
private static final String SPLIT_SIZE = "split_size";
private static final String PUBLIC_ACCESS = "public_access";
@@ -122,7 +124,8 @@ public class MaxComputeJniScanner extends JniScanner {
String accessKey = Objects.requireNonNull(params.get(ACCESS_KEY),
"required property '" + ACCESS_KEY + "'.");
String secretKey = Objects.requireNonNull(params.get(SECRET_KEY),
"required property '" + SECRET_KEY + "'.");
boolean enablePublicAccess =
Boolean.parseBoolean(params.getOrDefault(PUBLIC_ACCESS, "false"));
- return new MaxComputeTableScan(region, project, table, accessKey,
secretKey, enablePublicAccess);
+ return new MaxComputeTableScan(params.get(ODPS_URL),
params.get(TUNNEL_URL), region, project, table,
+ accessKey, secretKey, enablePublicAccess);
}
public String tableUniqKey() {
diff --git
a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeTableScan.java
b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeTableScan.java
index c0fa40dae46..0de1cb17e79 100644
---
a/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeTableScan.java
+++
b/fe/be-java-extensions/max-compute-scanner/src/main/java/org/apache/doris/maxcompute/MaxComputeTableScan.java
@@ -23,6 +23,7 @@ import com.aliyun.odps.TableSchema;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TunnelException;
+import org.apache.commons.lang3.StringUtils;
import java.io.IOException;
@@ -39,20 +40,40 @@ public class MaxComputeTableScan {
private volatile long readRows = 0;
private long totalRows = 0;
- public MaxComputeTableScan(String region, String project, String table,
+ public MaxComputeTableScan(String odpsUrl, String tunnelUrl, String
region, String project, String table,
String accessKey, String secretKey, boolean
enablePublicAccess) {
this.project = project;
this.table = table;
odps = new Odps(new AliyunAccount(accessKey, secretKey));
- String odpsUrl = odpsUrlTemplate.replace("{}", region);
- String tunnelUrl = tunnelUrlTemplate.replace("{}", region);
- if (enablePublicAccess) {
- odpsUrl = odpsUrl.replace("-inc", "");
- tunnelUrl = tunnelUrl.replace("-inc", "");
- }
- odps.setEndpoint(odpsUrl);
+ setOdpsUrl(odpsUrl, region, enablePublicAccess);
odps.setDefaultProject(this.project);
tunnel = new TableTunnel(odps);
+ setTunnelUrl(tunnelUrl, region, enablePublicAccess);
+ }
+
+ private void setOdpsUrl(String defaultOdpsUrl, String region, boolean
enablePublicAccess) {
+ String odpsUrl;
+ if (StringUtils.isNotEmpty(defaultOdpsUrl)) {
+ odpsUrl = defaultOdpsUrl;
+ } else {
+ odpsUrl = odpsUrlTemplate.replace("{}", region);
+ if (enablePublicAccess) {
+ odpsUrl = odpsUrl.replace("-inc", "");
+ }
+ }
+ odps.setEndpoint(odpsUrl);
+ }
+
+ private void setTunnelUrl(String defaultTunnelUrl, String region, boolean
enablePublicAccess) {
+ String tunnelUrl;
+ if (StringUtils.isNotEmpty(defaultTunnelUrl)) {
+ tunnelUrl = defaultTunnelUrl;
+ } else {
+ tunnelUrl = tunnelUrlTemplate.replace("{}", region);
+ if (enablePublicAccess) {
+ tunnelUrl = tunnelUrl.replace("-inc", "");
+ }
+ }
tunnel.setEndpoint(tunnelUrl);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalTable.java
index 5c25cf6cce0..8be885ae058 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalTable.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/MaxComputeExternalTable.java
@@ -271,6 +271,8 @@ public class MaxComputeExternalTable extends ExternalTable {
tMcTable.setRegion(mcCatalog.getRegion());
tMcTable.setAccessKey(mcCatalog.getAccessKey());
tMcTable.setSecretKey(mcCatalog.getSecretKey());
+ tMcTable.setOdpsUrl(mcCatalog.getOdpsUrl());
+ tMcTable.setTunnelUrl(mcCatalog.getTunnelUrl());
tMcTable.setPublicAccess(String.valueOf(mcCatalog.enablePublicAccess()));
// use mc project as dbName
tMcTable.setProject(dbName);
@@ -285,10 +287,4 @@ public class MaxComputeExternalTable extends ExternalTable
{
makeSureInitialized();
return odpsTable;
}
-
- @Override
- public String getMysqlType() {
- return "BASE TABLE";
- }
}
-
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeExternalCatalog.java
index b361d0c8144..216a70cfed6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeExternalCatalog.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/MaxComputeExternalCatalog.java
@@ -31,6 +31,7 @@ import com.aliyun.odps.tunnel.TableTunnel;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.gson.annotations.SerializedName;
+import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
import java.util.Iterator;
@@ -51,6 +52,8 @@ public class MaxComputeExternalCatalog extends
ExternalCatalog {
private boolean enablePublicAccess;
private static final String odpsUrlTemplate =
"http://service.{}.maxcompute.aliyun-inc.com/api";
private static final String tunnelUrlTemplate =
"http://dt.{}.maxcompute.aliyun-inc.com";
+ private static String odpsUrl;
+ private static String tunnelUrl;
private static final List<String> REQUIRED_PROPERTIES = ImmutableList.of(
MCProperties.REGION,
MCProperties.PROJECT
@@ -60,6 +63,8 @@ public class MaxComputeExternalCatalog extends
ExternalCatalog {
String comment) {
super(catalogId, name, InitCatalogLog.Type.MAX_COMPUTE, comment);
catalogProperty = new CatalogProperty(resource, props);
+ odpsUrl = props.getOrDefault(MCProperties.ODPS_ENDPOINT, "");
+ tunnelUrl = props.getOrDefault(MCProperties.TUNNEL_SDK_ENDPOINT, "");
}
@Override
@@ -88,16 +93,28 @@ public class MaxComputeExternalCatalog extends
ExternalCatalog {
Account account = new AliyunAccount(accessKey, secretKey);
this.odps = new Odps(account);
enablePublicAccess =
Boolean.parseBoolean(props.getOrDefault(MCProperties.PUBLIC_ACCESS, "false"));
- String odpsUrl = odpsUrlTemplate.replace("{}", region);
- if (enablePublicAccess) {
- odpsUrl = odpsUrl.replace("-inc", "");
- }
- odps.setEndpoint(odpsUrl);
+ setOdpsUrl(region);
odps.setDefaultProject(defaultProject);
tunnel = new TableTunnel(odps);
- String tunnelUrl = tunnelUrlTemplate.replace("{}", region);
- if (enablePublicAccess) {
- tunnelUrl = tunnelUrl.replace("-inc", "");
+ setTunnelUrl(region);
+ }
+
+ private void setOdpsUrl(String region) {
+ if (StringUtils.isEmpty(odpsUrl)) {
+ odpsUrl = odpsUrlTemplate.replace("{}", region);
+ if (enablePublicAccess) {
+ odpsUrl = odpsUrl.replace("-inc", "");
+ }
+ }
+ odps.setEndpoint(odpsUrl);
+ }
+
+ private void setTunnelUrl(String region) {
+ if (StringUtils.isEmpty(tunnelUrl)) {
+ tunnelUrl = tunnelUrlTemplate.replace("{}", region);
+ if (enablePublicAccess) {
+ tunnelUrl = tunnelUrl.replace("-inc", "");
+ }
}
tunnel.setEndpoint(tunnelUrl);
}
@@ -211,4 +228,12 @@ public class MaxComputeExternalCatalog extends
ExternalCatalog {
}
}
}
+
+ public String getOdpsUrl() {
+ return odpsUrl;
+ }
+
+ public String getTunnelUrl() {
+ return tunnelUrl;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/MCProperties.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/MCProperties.java
index 32c8534ace1..e3059cee4d2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/MCProperties.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/MCProperties.java
@@ -31,6 +31,8 @@ public class MCProperties extends BaseProperties {
public static final String SECRET_KEY = "mc.secret_key";
public static final String SESSION_TOKEN = "mc.session_token";
public static final String PUBLIC_ACCESS = "mc.public_access";
+ public static final String ODPS_ENDPOINT = "mc.odps_endpoint";
+ public static final String TUNNEL_SDK_ENDPOINT = "mc.tunnel_endpoint";
public static CloudCredential getCredential(Map<String, String> props) {
return getCloudCredential(props, ACCESS_KEY, SECRET_KEY,
SESSION_TOKEN);
diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift
index 6507d04c2dd..884eb2249c4 100644
--- a/gensrc/thrift/Descriptors.thrift
+++ b/gensrc/thrift/Descriptors.thrift
@@ -333,6 +333,8 @@ struct TMCTable {
4: optional string access_key
5: optional string secret_key
6: optional string public_access
+ 7: optional string odps_url
+ 8: optional string tunnel_url
}
// "Union" of all table types.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]