This is an automated email from the ASF dual-hosted git repository. hubgeter pushed a commit to branch mc-test-branch-4.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 7f454af1c311984ac02ce8d78eb37504d6671a63 Author: zgxme <[email protected]> AuthorDate: Thu Apr 16 21:32:08 2026 +0800 [test](regression) Extend MaxCompute regression coverage Issue Number: None Related PR: None Problem Summary: Extend MaxCompute regression coverage by adding catalog/property validation, namespace schema checks, cross-catalog join and insert cases, create-table negative cases, and large-field and strict-consistency write-path scenarios. None - Test: Not run in this workspace - Regression test changes only; local MaxCompute regression environment was not re-run here - Behavior changed: No - Does this need documentation: No --- .../test_external_catalog_maxcompute.out | 139 ++++- .../external_table_p2/maxcompute/test_mc_join.out | 19 + .../maxcompute/write/test_mc_write_insert.out | 5 + .../test_external_catalog_maxcompute.groovy | 604 +++++++++++++++++++++ ...ernal_catalog_maxcompute_fault_tolerance.groovy | 253 +++++++++ .../test_max_compute_create_table.groovy | 64 ++- .../maxcompute/test_mc_join.groovy | 183 +++++++ .../maxcompute/write/test_mc_write_ctas.groovy | 4 +- .../maxcompute/write/test_mc_write_insert.groovy | 131 +++++ .../write/test_mc_write_large_data.groovy | 15 + .../write/test_mc_write_static_partitions.groovy | 21 + 11 files changed, 1417 insertions(+), 21 deletions(-) diff --git a/regression-test/data/external_table_p2/maxcompute/test_external_catalog_maxcompute.out b/regression-test/data/external_table_p2/maxcompute/test_external_catalog_maxcompute.out index af7b8dd3f12..0b7ca458a3d 100644 --- a/regression-test/data/external_table_p2/maxcompute/test_external_catalog_maxcompute.out +++ b/regression-test/data/external_table_p2/maxcompute/test_external_catalog_maxcompute.out @@ -3,9 +3,9 @@ 5 -- !q2 -- -3 WS0003 2023-03-01 2023-10-31 Example Web Site 3 20230301 20231031 Healthcare Robert White 103 Health Healthcare services and products Emily Green 203 Health Corp 9101 Health Drive Floor Floor 3 Star City County TX 23456 USA -6.0 6.75 -4 WS0004 2023-04-01 2023-09-30 Example Web Site 4 20230401 20230930 Education David Black 104 EdTech Educational technology platform Fiona Grey 204 Edu Tech 1122 Education Lane Building Building 1 Smallville County FL 34567 USA -4.0 5.00 -5 WS0005 2023-05-01 2023-08-31 Example Web Site 5 20230501 20230831 Travel Sophia Blue 105 Travel Travel and booking services Daniel Red 205 Travel Inc 3344 Tourist Street Unit Unit 5 Metropolis County WA 45678 USA -7.0 8.00 +3 WS0003 2023-03-01 2023-10-31 Example Web Site 3 20230301 20231031 Healthcare Robert White 103 Health Healthcare services and products Emily Green 203 Health Corp 9101 Health Drive Floor Floor 3 Star City County TX 23456 USA -6 6.75 +4 WS0004 2023-04-01 2023-09-30 Example Web Site 4 20230401 20230930 Education David Black 104 EdTech Educational technology platform Fiona Grey 204 Edu Tech 1122 Education Lane Building Building 1 Smallville County FL 34567 USA -4 5.00 +5 WS0005 2023-05-01 2023-08-31 Example Web Site 5 20230501 20230831 Travel Sophia Blue 105 Travel Travel and booking services Daniel Red 205 Travel Inc 3344 Tourist Street Unit Unit 5 Metropolis County WA 45678 USA -7 8.00 -- !q3 -- false -1 -1 -1 @@ -82,6 +82,129 @@ mc_bigint bigint Yes true \N mc_string text Yes true \N dt text Yes true \N +-- !q01_column_prune_web_site_result -- +3 + +-- !q01_column_prune_mc_parts_result -- +1003 + +-- !q02_non_partition_eq_result -- +3 WS0003 + +-- !q02_non_partition_in_result -- +WS0001 201 +WS0003 203 +WS0005 205 + +-- !q02_non_partition_is_null_result -- +2 +3 +5 + +-- !q02_non_partition_between_result -- +WS0002 202 +WS0003 203 +WS0004 204 + +-- !q02_non_partition_like_result -- +WS0003 + +-- !n05_web_site_count -- +5 + +-- !n05_web_site_slice -- +3 WS0003 2023-03-01 2023-10-31 Example Web Site 3 20230301 20231031 Healthcare Robert White 103 Health Healthcare services and products Emily Green 203 Health Corp 9101 Health Drive Floor Floor 3 Star City County TX 23456 USA -6 6.75 +4 WS0004 2023-04-01 2023-09-30 Example Web Site 4 20230401 20230930 Education David Black 104 EdTech Educational technology platform Fiona Grey 204 Edu Tech 1122 Education Lane Building Building 1 Smallville County FL 34567 USA -4 5.00 +5 WS0005 2023-05-01 2023-08-31 Example Web Site 5 20230501 20230831 Travel Sophia Blue 105 Travel Travel and booking services Daniel Red 205 Travel Inc 3344 Tourist Street Unit Unit 5 Metropolis County WA 45678 USA -7 8.00 + +-- !n05_mc_parts_eq -- +1003 Sample data 3 2023-08-03 + +-- !n05_mc_parts_or -- +1001 Sample data 1 2023-08-01 +1002 Sample data 2 2023-08-02 +1005 Sample data 5 2023-08-05 + +-- !n05_multi_partition_datetime -- +3 2023 08 03 +3 2023 08 03 +4 2023 08 04 +4 2023 08 04 +5 2023 08 05 +5 2023 08 05 + +-- !n05_web_site_count -- +5 + +-- !n05_web_site_slice -- +3 WS0003 2023-03-01 2023-10-31 Example Web Site 3 20230301 20231031 Healthcare Robert White 103 Health Healthcare services and products Emily Green 203 Health Corp 9101 Health Drive Floor Floor 3 Star City County TX 23456 USA -6 6.75 +4 WS0004 2023-04-01 2023-09-30 Example Web Site 4 20230401 20230930 Education David Black 104 EdTech Educational technology platform Fiona Grey 204 Edu Tech 1122 Education Lane Building Building 1 Smallville County FL 34567 USA -4 5.00 +5 WS0005 2023-05-01 2023-08-31 Example Web Site 5 20230501 20230831 Travel Sophia Blue 105 Travel Travel and booking services Daniel Red 205 Travel Inc 3344 Tourist Street Unit Unit 5 Metropolis County WA 45678 USA -7 8.00 + +-- !n05_mc_parts_eq -- +1003 Sample data 3 2023-08-03 + +-- !n05_mc_parts_or -- +1001 Sample data 1 2023-08-01 +1002 Sample data 2 2023-08-02 +1005 Sample data 5 2023-08-05 + +-- !n05_multi_partition_datetime -- +3 2023 08 03 +3 2023 08 03 +4 2023 08 04 +4 2023 08 04 +5 2023 08 05 +5 2023 08 05 + +-- !n05_web_site_count -- +5 + +-- !n05_web_site_slice -- +3 WS0003 2023-03-01 2023-10-31 Example Web Site 3 20230301 20231031 Healthcare Robert White 103 Health Healthcare services and products Emily Green 203 Health Corp 9101 Health Drive Floor Floor 3 Star City County TX 23456 USA -6 6.75 +4 WS0004 2023-04-01 2023-09-30 Example Web Site 4 20230401 20230930 Education David Black 104 EdTech Educational technology platform Fiona Grey 204 Edu Tech 1122 Education Lane Building Building 1 Smallville County FL 34567 USA -4 5.00 +5 WS0005 2023-05-01 2023-08-31 Example Web Site 5 20230501 20230831 Travel Sophia Blue 105 Travel Travel and booking services Daniel Red 205 Travel Inc 3344 Tourist Street Unit Unit 5 Metropolis County WA 45678 USA -7 8.00 + +-- !n05_mc_parts_eq -- +1003 Sample data 3 2023-08-03 + +-- !n05_mc_parts_or -- +1001 Sample data 1 2023-08-01 +1002 Sample data 2 2023-08-02 +1005 Sample data 5 2023-08-05 + +-- !n05_multi_partition_datetime -- +3 2023 08 03 +3 2023 08 03 +4 2023 08 04 +4 2023 08 04 +5 2023 08 05 +5 2023 08 05 + +-- !n05_web_site_count -- +5 + +-- !n05_web_site_slice -- +3 WS0003 2023-03-01 2023-10-31 Example Web Site 3 20230301 20231031 Healthcare Robert White 103 Health Healthcare services and products Emily Green 203 Health Corp 9101 Health Drive Floor Floor 3 Star City County TX 23456 USA -6 6.75 +4 WS0004 2023-04-01 2023-09-30 Example Web Site 4 20230401 20230930 Education David Black 104 EdTech Educational technology platform Fiona Grey 204 Edu Tech 1122 Education Lane Building Building 1 Smallville County FL 34567 USA -4 5.00 +5 WS0005 2023-05-01 2023-08-31 Example Web Site 5 20230501 20230831 Travel Sophia Blue 105 Travel Travel and booking services Daniel Red 205 Travel Inc 3344 Tourist Street Unit Unit 5 Metropolis County WA 45678 USA -7 8.00 + +-- !n05_mc_parts_eq -- +1003 Sample data 3 2023-08-03 + +-- !n05_mc_parts_or -- +1001 Sample data 1 2023-08-01 +1002 Sample data 2 2023-08-02 +1005 Sample data 5 2023-08-05 + +-- !n05_multi_partition_datetime -- +3 2023 08 03 +3 2023 08 03 +4 2023 08 04 +4 2023 08 04 +5 2023 08 05 +5 2023 08 05 + -- !replay_q6 -- 1003 Sample data 3 2023-08-03 1004 Sample data 4 2023-08-04 @@ -260,9 +383,9 @@ audit_flag text Yes true \N -- !where_3 -- -- !where_4 -- -1 WS0001 2023-01-01 2023-12-31 Example Web Site 1 20230101 20231231 E-commerce John Doe 101 Retail Online retail website Jane Smith 201 Example Company 1234 Main Street Apt Unit 101 Metropolis County NY 12345 USA -5.0 8.25 -2 WS0002 2023-02-01 2023-11-30 Example Web Site 2 20230201 20231130 Technology Alice Johnson 102 Tech Tech news and reviews Bob Brown 202 Tech Innovations 5678 Tech Avenue Suite Suite 200 Gotham County CA 67890 USA -8.0 7.50 -3 WS0003 2023-03-01 2023-10-31 Example Web Site 3 20230301 20231031 Healthcare Robert White 103 Health Healthcare services and products Emily Green 203 Health Corp 9101 Health Drive Floor Floor 3 Star City County TX 23456 USA -6.0 6.75 -4 WS0004 2023-04-01 2023-09-30 Example Web Site 4 20230401 20230930 Education David Black 104 EdTech Educational technology platform Fiona Grey 204 Edu Tech 1122 Education Lane Building Building 1 Smallville County FL 34567 USA -4.0 5.00 -5 WS0005 2023-05-01 2023-08-31 Example Web Site 5 20230501 20230831 Travel Sophia Blue 105 Travel Travel and booking services Daniel Red 205 Travel Inc 3344 Tourist Street Unit Unit 5 Metropolis County WA 45678 USA -7.0 8.00 +1 WS0001 2023-01-01 2023-12-31 Example Web Site 1 20230101 20231231 E-commerce John Doe 101 Retail Online retail website Jane Smith 201 Example Company 1234 Main Street Apt Unit 101 Metropolis County NY 12345 USA -5 8.25 +2 WS0002 2023-02-01 2023-11-30 Example Web Site 2 20230201 20231130 Technology Alice Johnson 102 Tech Tech news and reviews Bob Brown 202 Tech Innovations 5678 Tech Avenue Suite Suite 200 Gotham County CA 67890 USA -8 7.50 +3 WS0003 2023-03-01 2023-10-31 Example Web Site 3 20230301 20231031 Healthcare Robert White 103 Health Healthcare services and products Emily Green 203 Health Corp 9101 Health Drive Floor Floor 3 Star City County TX 23456 USA -6 6.75 +4 WS0004 2023-04-01 2023-09-30 Example Web Site 4 20230401 20230930 Education David Black 104 EdTech Educational technology platform Fiona Grey 204 Edu Tech 1122 Education Lane Building Building 1 Smallville County FL 34567 USA -4 5.00 +5 WS0005 2023-05-01 2023-08-31 Example Web Site 5 20230501 20230831 Travel Sophia Blue 105 Travel Travel and booking services Daniel Red 205 Travel Inc 3344 Tourist Street Unit Unit 5 Metropolis County WA 45678 USA -7 8.00 diff --git a/regression-test/data/external_table_p2/maxcompute/test_mc_join.out b/regression-test/data/external_table_p2/maxcompute/test_mc_join.out new file mode 100644 index 00000000000..c6dd4b66a4e --- /dev/null +++ b/regression-test/data/external_table_p2/maxcompute/test_mc_join.out @@ -0,0 +1,19 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !j01_mc_join_internal -- +1 Alice finance +2 Bob it +4 David ops + +-- !j02_mc_join_hive -- +1 Alice north +2 Bob east + +-- !j02_mc_join_iceberg -- +1 Alice gold +3 Carol silver + +-- !j03_mc_join_cross_catalog -- +1 Alice 10 +2 Bob 20 +3 Carol 30 + diff --git a/regression-test/data/external_table_p2/maxcompute/write/test_mc_write_insert.out b/regression-test/data/external_table_p2/maxcompute/write/test_mc_write_insert.out index 9c7a1a21807..6d6086ec6e1 100644 --- a/regression-test/data/external_table_p2/maxcompute/write/test_mc_write_insert.out +++ b/regression-test/data/external_table_p2/maxcompute/write/test_mc_write_insert.out @@ -18,3 +18,8 @@ 2 batch2 3 batch3 +-- !cross_catalog_insert -- +11 internal_alice 110.5 +12 internal_bob 120.3 +13 internal_charlie 130.7 + diff --git a/regression-test/suites/external_table_p2/maxcompute/test_external_catalog_maxcompute.groovy b/regression-test/suites/external_table_p2/maxcompute/test_external_catalog_maxcompute.groovy index 827ed963873..d36d4ebba39 100644 --- a/regression-test/suites/external_table_p2/maxcompute/test_external_catalog_maxcompute.groovy +++ b/regression-test/suites/external_table_p2/maxcompute/test_external_catalog_maxcompute.groovy @@ -332,8 +332,56 @@ suite("test_external_catalog_maxcompute", "p2,external,maxcompute,external_remot String ak = context.config.otherConfigs.get("ak") String sk = context.config.otherConfigs.get("sk"); String mc_db = "mc_datalake" + String mc_schema_project = "mc_datalake_schema" String mc_catalog_name = "test_external_mc_catalog" + String mc_optional_catalog_name = "${mc_catalog_name}_optional" + String mc_alter_catalog_name = "${mc_optional_catalog_name}_split_strategy" + String mc_namespace_schema_catalog_name = "${mc_optional_catalog_name}_namespace_schema" + String mc_drop_catalog_name = "${mc_catalog_name}_drop" + String mc_invalid_ak_catalog_name = "${mc_catalog_name}_invalid_ak" + String mc_invalid_endpoint_catalog_name = "${mc_catalog_name}_invalid_endpoint" + String mc_invalid_project_catalog_name = "${mc_catalog_name}_invalid_project" + def assertCatalogExists = { String catalogName -> + List<List<Object>> catalogs = sql """show catalogs""" + assertTrue(catalogs.any { it[1].toString() == catalogName }, + "catalog ${catalogName} should exist") + } + + def assertCatalogNotExists = { String catalogName -> + List<List<Object>> catalogs = sql """show catalogs""" + assertFalse(catalogs.any { it[1].toString() == catalogName }, + "catalog ${catalogName} should not exist") + } + + def assertShowCreateCatalogContains = { String catalogName, List<String> fragments -> + def result = sql """show create catalog ${catalogName};""" + assertEquals(1, result.size()) + String ddl = result[0][1].toString() + fragments.each { fragment -> + assertTrue(ddl.contains(fragment), + "show create catalog ${catalogName} should contain '${fragment}', actual ddl: ${ddl}") + } + } + + def assertCatalogHasErrorMessage = { String catalogName -> + List<List<Object>> catalogs = sql """show catalogs""" + def target = catalogs.find { it[1].toString() == catalogName } + assertTrue(target != null, "catalog ${catalogName} should exist in show catalogs") + assertTrue(target[7] != null && !target[7].toString().trim().isEmpty(), + "catalog ${catalogName} should have non-empty error message, actual row: ${target}") + } + + def assertCatalogHasTable = { String catalogName, String dbName, String tableName -> + sql """switch `${catalogName}`;""" + sql """use `${dbName}`;""" + List<List<Object>> tables = sql """show tables""" + assertTrue(tables.any { it[0].toString() == tableName }, + "table ${tableName} should exist in ${catalogName}.${dbName}") + } + + // C-01: CREATE CATALOG with required properties: type, mc.default.project, + // mc.access_key, mc.secret_key, mc.endpoint sql """drop catalog if exists ${mc_catalog_name};""" sql """ create catalog if not exists ${mc_catalog_name} properties ( @@ -344,6 +392,344 @@ suite("test_external_catalog_maxcompute", "p2,external,maxcompute,external_remot "mc.endpoint" = "http://service.cn-beijing-vpc.maxcompute.aliyun-inc.com/api" ); """ + assertCatalogExists(mc_catalog_name) + assertShowCreateCatalogContains(mc_catalog_name, [ + "\"type\" = \"max_compute\"", + "\"mc.default.project\" = \"${mc_db}\"", + "\"mc.endpoint\" = \"http://service.cn-beijing-vpc.maxcompute.aliyun-inc.com/api\"" + ]) + + Map<String, String> requiredCatalogProperties = new LinkedHashMap<>([ + "type": "max_compute", + "mc.default.project": "${mc_db}", + "mc.access_key": "${ak}", + "mc.secret_key": "${sk}", + "mc.endpoint": "http://service.cn-beijing-vpc.maxcompute.aliyun-inc.com/api" + ]) + List<Map<String, String>> missingRequiredPropertyCases = [ + [ + "catalogName": "mc_missing_type", + "missingProperty": "type", + "errorMessage": "Missing property 'type' in properties" + ], + [ + "catalogName": "mc_missing_project", + "missingProperty": "mc.default.project", + "errorMessage": "Required property 'mc.default.project' is missing" + ], + [ + "catalogName": "mc_missing_endpoint", + "missingProperty": "mc.endpoint", + "errorMessage": "Required property 'mc.endpoint' is missing" + ], + [ + "catalogName": "mc_missing_access_key", + "missingProperty": "mc.access_key", + "errorMessage": "Missing access key or secret key for AK/SK auth type" + ], + [ + "catalogName": "mc_missing_secret_key", + "missingProperty": "mc.secret_key", + "errorMessage": "Missing access key or secret key for AK/SK auth type" + ] + ] + def renderCatalogProperties = { Map<String, String> properties -> + properties.collect { key, value -> "\"${key}\" = \"${value}\"" }.join(",\n ") + } + def assertCreateCatalogMissingRequiredProperty = { Map<String, String> testCase -> + Map<String, String> properties = new LinkedHashMap<>(requiredCatalogProperties) + properties.remove(testCase.missingProperty) + test { + sql """drop catalog if exists ${testCase.catalogName};""" + sql """ + create catalog ${testCase.catalogName} properties ( + ${renderCatalogProperties(properties)} + ); + """ + exception testCase.errorMessage + } + } + missingRequiredPropertyCases.each(assertCreateCatalogMissingRequiredProperty) + + // C-02: CREATE CATALOG with each McOptionalProperties entry individually + List<Map<String, Object>> optionalCatalogCases = [ + [ + "catalogName": "${mc_optional_catalog_name}_tunnel_endpoint", + "optionalProperties": ["mc.tunnel_endpoint": "http://dt.cn-beijing-vpc.maxcompute.aliyun-inc.com"] + ], + [ + "catalogName": "${mc_optional_catalog_name}_odps_endpoint", + "optionalProperties": ["mc.odps_endpoint": "http://service.cn-beijing-vpc.maxcompute.aliyun-inc.com/api"] + ], + [ + "catalogName": "${mc_optional_catalog_name}_quota", + "optionalProperties": ["mc.quota": "pay-as-you-go"] + ], + [ + "catalogName": "${mc_optional_catalog_name}_split_strategy", + "optionalProperties": ["mc.split_strategy": "row_count"] + ], + [ + "catalogName": "${mc_optional_catalog_name}_split_byte_size", + "optionalProperties": ["mc.split_byte_size": "536870912"] + ], + [ + "catalogName": "${mc_optional_catalog_name}_split_row_count", + "optionalProperties": ["mc.split_row_count": "2048"] + ], + [ + "catalogName": "${mc_optional_catalog_name}_split_cross_partition", + "optionalProperties": ["mc.split_cross_partition": "false"] + ], + [ + "catalogName": "${mc_optional_catalog_name}_connect_timeout", + "optionalProperties": ["mc.connect_timeout": "9"] + ], + [ + "catalogName": "${mc_optional_catalog_name}_read_timeout", + "optionalProperties": ["mc.read_timeout": "19"] + ], + [ + "catalogName": "${mc_optional_catalog_name}_retry_count", + "optionalProperties": ["mc.retry_count": "1"] + ], + [ + "catalogName": "${mc_optional_catalog_name}_datetime_pushdown", + "optionalProperties": ["mc.datetime_predicate_push_down": "false"] + ], + [ + "catalogName": "${mc_optional_catalog_name}_account_format", + "optionalProperties": ["mc.account_format": "id"] + ], + [ + // mc_datalake is a non-schema project. Keep it for the existing non-schema coverage. + // For mc.enable.namespace.schema=true, use mc_datalake_schema; using mc_datalake here is + // expected to fail. + "catalogName": mc_namespace_schema_catalog_name, + "baseProperties": ["mc.default.project": mc_schema_project], + "optionalProperties": ["mc.enable.namespace.schema": "true"] + ], + [ + "catalogName": "${mc_optional_catalog_name}_max_field_size", + "optionalProperties": ["mc.max_field_size_bytes": "4194304"] + ] + ] + def assertCreateCatalogWithOptionalProperties = { Map<String, Object> testCase -> + Map<String, String> properties = new LinkedHashMap<>(requiredCatalogProperties) + Map<String, String> baseProperties = testCase.baseProperties as Map<String, String> + if (baseProperties != null) { + properties.putAll(baseProperties) + } + Map<String, String> optionalProperties = testCase.optionalProperties as Map<String, String> + properties.putAll(optionalProperties) + sql """drop catalog if exists ${testCase.catalogName};""" + sql """ + create catalog if not exists ${testCase.catalogName} properties ( + ${renderCatalogProperties(properties)} + ); + """ + assertCatalogExists(testCase.catalogName as String) + assertShowCreateCatalogContains(testCase.catalogName as String, + optionalProperties.collect { key, value -> "\"${key}\" = \"${value}\"" }) + } + optionalCatalogCases.each(assertCreateCatalogWithOptionalProperties) + + String namespaceSchemaSuffix = java.util.UUID.randomUUID().toString().replace("-", "").substring(0, 8) + String namespaceSchemaEnabledDb = "mc_ns_db_${namespaceSchemaSuffix}" + String namespaceSchemaTableDb = "mc_ns_db_tbl_${namespaceSchemaSuffix}" + String namespaceSchemaEnabledTable = "mc_ns_tbl_${namespaceSchemaSuffix}" + String namespaceSchemaDisabledDb = "mc_ns_dbf_${namespaceSchemaSuffix}" + String namespaceSchemaProjectTable = "mc_ns_project_tbl_${namespaceSchemaSuffix}" + def cleanupNamespaceSchemaResources = { + try { + sql """switch internal""" + } catch (Throwable ignored) { + } + try { + sql """ + alter catalog ${mc_namespace_schema_catalog_name} set properties ( + "mc.enable.namespace.schema" = "true" + ); + """ + } catch (Throwable ignored) { + } + try { + sql """switch ${mc_namespace_schema_catalog_name};""" + } catch (Throwable ignored) { + } + try { + sql """drop database if exists ${namespaceSchemaEnabledDb} force""" + } catch (Throwable ignored) { + } + try { + sql """drop database if exists ${namespaceSchemaTableDb} force""" + } catch (Throwable ignored) { + } + try { + sql """switch internal""" + } catch (Throwable ignored) { + } + } + try { + sql """switch ${mc_namespace_schema_catalog_name};""" + sql """drop database if exists ${namespaceSchemaEnabledDb} force""" + sql """drop database if exists ${namespaceSchemaTableDb} force""" + sql """create database ${namespaceSchemaEnabledDb}""" + sql """refresh catalog ${mc_namespace_schema_catalog_name} properties ('invalid_cache'='true')""" + sql """switch ${mc_namespace_schema_catalog_name};""" + List<List<Object>> namespaceSchemaDatabases = + sql """show databases like '${namespaceSchemaEnabledDb}'""" + assertEquals(1, namespaceSchemaDatabases.size()) + sql """drop database ${namespaceSchemaEnabledDb} force""" + sql """refresh catalog ${mc_namespace_schema_catalog_name} properties ('invalid_cache'='true')""" + sql """switch ${mc_namespace_schema_catalog_name};""" + List<List<Object>> droppedNamespaceSchemaDatabases = + sql """show databases like '${namespaceSchemaEnabledDb}'""" + assertEquals(0, droppedNamespaceSchemaDatabases.size()) + test { + sql """use ${namespaceSchemaEnabledDb}""" + exception "Unknown database" + } + sql """create database ${namespaceSchemaTableDb}""" + sql """use ${namespaceSchemaTableDb}""" + sql """drop table if exists ${namespaceSchemaEnabledTable}""" + sql """ + create table ${namespaceSchemaEnabledTable} ( + id int + ) + """ + List<List<Object>> namespaceSchemaTables = sql """show tables like '${namespaceSchemaEnabledTable}'""" + assertEquals(1, namespaceSchemaTables.size()) + sql """switch internal""" + + sql """ + alter catalog ${mc_namespace_schema_catalog_name} set properties ( + "mc.enable.namespace.schema" = "false" + ); + """ + assertShowCreateCatalogContains(mc_namespace_schema_catalog_name, [ + "\"mc.enable.namespace.schema\" = \"false\"" + ]) + sql """switch ${mc_namespace_schema_catalog_name};""" + test { + sql """create database ${namespaceSchemaDisabledDb}""" + exception "Create database is not supported when mc.enable.namespace.schema is false" + } + sql """use ${mc_schema_project}""" + sql """drop table if exists ${namespaceSchemaProjectTable}""" + sql """ + create table ${namespaceSchemaProjectTable} ( + id int + ) + """ + List<List<Object>> namespaceSchemaProjectTables = + sql """show tables like '${namespaceSchemaProjectTable}'""" + assertEquals(1, namespaceSchemaProjectTables.size()) + sql """switch internal""" + } finally { + cleanupNamespaceSchemaResources() + } + + // C-03: ALTER CATALOG based on the C-02 split_strategy catalog + sql """ + alter catalog ${mc_alter_catalog_name} set properties ( + "mc.split_strategy" = "byte_size", + "mc.split_byte_size" = "268435456", + "mc.connect_timeout" = "11", + "mc.read_timeout" = "21", + "mc.retry_count" = "2", + "mc.split_cross_partition" = "true", + "mc.datetime_predicate_push_down" = "true" + ); + """ + assertShowCreateCatalogContains(mc_alter_catalog_name, [ + "\"mc.split_strategy\" = \"byte_size\"", + "\"mc.split_byte_size\" = \"268435456\"", + "\"mc.connect_timeout\" = \"11\"", + "\"mc.read_timeout\" = \"21\"", + "\"mc.retry_count\" = \"2\"", + "\"mc.split_cross_partition\" = \"true\"", + "\"mc.datetime_predicate_push_down\" = \"true\"" + ]) + assertCatalogHasTable(mc_alter_catalog_name, mc_db, "web_site") + sql """switch internal""" + + // C-05: DROP CATALOG + sql """drop catalog if exists ${mc_drop_catalog_name};""" + sql """ + create catalog if not exists ${mc_drop_catalog_name} properties ( + "type" = "max_compute", + "mc.default.project" = "${mc_db}", + "mc.access_key" = "${ak}", + "mc.secret_key" = "${sk}", + "mc.endpoint" = "http://service.cn-beijing-vpc.maxcompute.aliyun-inc.com/api" + ); + """ + assertCatalogExists(mc_drop_catalog_name) + sql """drop catalog ${mc_drop_catalog_name}""" + assertCatalogNotExists(mc_drop_catalog_name) + test { + sql """show create catalog ${mc_drop_catalog_name}""" + exception "No catalog found with name test_external_mc_catalog_drop" + } + + // C-06: Invalid credentials + sql """drop catalog if exists ${mc_invalid_ak_catalog_name};""" + sql """ + create catalog if not exists ${mc_invalid_ak_catalog_name} properties ( + "type" = "max_compute", + "mc.default.project" = "${mc_db}", + "mc.access_key" = "invalid_ak_for_regression", + "mc.secret_key" = "invalid_sk_for_regression", + "mc.endpoint" = "http://service.cn-beijing-vpc.maxcompute.aliyun-inc.com/api" + ); + """ + test { + sql """show databases from ${mc_invalid_ak_catalog_name}""" + exception "errCode = 2" + } + // TODO: re-enable after show catalogs error message is stabilized. + // assertCatalogHasErrorMessage(mc_invalid_ak_catalog_name) + + // C-07: Invalid endpoint + sql """drop catalog if exists ${mc_invalid_endpoint_catalog_name};""" + sql """ + create catalog if not exists ${mc_invalid_endpoint_catalog_name} properties ( + "type" = "max_compute", + "mc.default.project" = "${mc_db}", + "mc.access_key" = "${ak}", + "mc.secret_key" = "${sk}", + "mc.endpoint" = "http://127.0.0.1:1/api", + "mc.connect_timeout" = "1", + "mc.read_timeout" = "1", + "mc.retry_count" = "1" + ); + """ + test { + sql """show databases from ${mc_invalid_endpoint_catalog_name}""" + exception "errCode = 2" + } + // TODO: re-enable after show catalogs error message is stabilized. + // assertCatalogHasErrorMessage(mc_invalid_endpoint_catalog_name) + + // C-08: Invalid project + sql """drop catalog if exists ${mc_invalid_project_catalog_name};""" + sql """ + create catalog if not exists ${mc_invalid_project_catalog_name} properties ( + "type" = "max_compute", + "mc.default.project" = "not_exist_project_for_regression", + "mc.access_key" = "${ak}", + "mc.secret_key" = "${sk}", + "mc.endpoint" = "http://service.cn-beijing-vpc.maxcompute.aliyun-inc.com/api", + "mc.enable.namespace.schema" = "true" + ); + """ + test { + sql """show databases from ${mc_invalid_project_catalog_name}""" + exception "errCode = 2" + } + // TODO: re-enable after show catalogs error message is stabilized. + // assertCatalogHasErrorMessage(mc_invalid_project_catalog_name) // query data test def q01 = { @@ -364,12 +750,226 @@ suite("test_external_catalog_maxcompute", "p2,external,maxcompute,external_remot order_qt_q7 """ select * from mc_parts where dt < '2023-08-03' or (mc_bigint > 1003 and dt > '2023-08-04') order by mc_bigint, dt; """ qt_q8 """ desc mc_parts """ } + def qQ01 = { + explain { + sql """ + select web_site_sk + from web_site + where web_site_id = 'WS0003' + """ + verbose(true) + contains "OUTPUT EXPRS:" + contains "web_site_sk[#" + contains "web_site_id[#" + notContains "web_company_name[#" + } + explain { + sql """ + select mc_bigint + from mc_parts + where dt = '2023-08-03' + """ + verbose(true) + contains "OUTPUT EXPRS:" + contains "mc_bigint[#" + contains "dt[#" + notContains "mc_string[#" + } + order_qt_q01_column_prune_web_site_result """ + select web_site_sk + from web_site + where web_site_id = 'WS0003' + order by web_site_sk + """ + order_qt_q01_column_prune_mc_parts_result """ + select mc_bigint + from mc_parts + where dt = '2023-08-03' + order by mc_bigint + """ + } + def qQ02 = { + // Q-02: Non-partition predicate coverage. The scan node should retain predicates in explain output. + explain { + sql """select * from web_site where web_site_id = 'WS0003'""" + contains "predicates:" + contains "web_site_id" + contains "= 'WS0003'" + } + explain { + sql """select * from web_site where web_site_id != 'WS0003'""" + contains "predicates:" + contains "web_site_id" + contains "!= 'WS0003'" + } + explain { + sql """select * from web_site where web_company_id > 203""" + contains "predicates:" + contains "web_company_id" + contains "> 203" + } + explain { + sql """select * from web_site where web_company_id < 203""" + contains "predicates:" + contains "web_company_id" + contains "< 203" + } + explain { + sql """select * from web_site where web_company_id >= 203""" + contains "predicates:" + contains "web_company_id" + contains ">= 203" + } + explain { + sql """select * from web_site where web_company_id <= 203""" + contains "predicates:" + contains "web_company_id" + contains "<= 203" + } + explain { + sql """select * from web_site where web_company_id in (201, 203, 205)""" + contains "predicates:" + contains "web_company_id" + contains "IN (201, 203, 205)" + } + explain { + sql """select * from web_site where web_company_id not in (201, 203, 205)""" + contains "predicates:" + contains "web_company_id" + contains "NOT IN (201, 203, 205)" + } + explain { + sql """select * from mc_test_null where col is null""" + contains "predicates:" + contains "col" + contains "IS NULL" + } + explain { + sql """select * from mc_test_null where col is not null""" + contains "predicates:" + contains "col" + contains "IS NOT NULL" + } + explain { + sql """select * from web_site where web_company_id between 202 and 204""" + contains "predicates:" + contains "web_company_id" + contains ">= 202" + contains "<= 204" + } + explain { + sql """select * from web_site where web_name like 'Example Web Site 3%'""" + contains "predicates:" + contains "web_name" + containsAny "LIKE 'Example Web Site 3%'" + containsAny "like 'Example Web Site 3%'" + } + order_qt_q02_non_partition_eq_result """ + select web_site_sk, web_site_id + from web_site + where web_site_id = 'WS0003' + order by web_site_sk + """ + order_qt_q02_non_partition_in_result """ + select web_site_id, web_company_id + from web_site + where web_company_id in (201, 203, 205) + order by web_company_id + """ + order_qt_q02_non_partition_is_null_result """ + select id + from mc_test_null + where col is null + order by id + """ + order_qt_q02_non_partition_between_result """ + select web_site_id, web_company_id + from web_site + where web_company_id between 202 and 204 + order by web_company_id + """ + order_qt_q02_non_partition_like_result """ + select web_site_id + from web_site + where web_name like 'Example Web Site 3%' + order by web_site_id + """ + } + def qN05 = { + qt_n05_web_site_count """ select count(*) from web_site """ + order_qt_n05_web_site_slice """ select * from web_site where web_site_id >= 'WS0003' order by web_site_id; """ + order_qt_n05_mc_parts_eq """ select * from mc_parts where dt = '2023-08-03' order by mc_bigint """ + order_qt_n05_mc_parts_or """ select * from mc_parts where dt < '2023-08-03' + or (mc_bigint > 1003 and dt > '2023-08-04') order by mc_bigint, dt; """ + order_qt_n05_multi_partition_datetime """ select pt, yy, mm, dd from multi_partitions + where pt >= 2 and create_time > '2023-08-03 03:11:00' order by pt, yy, mm, dd; """ + } sql """ switch `${mc_catalog_name}`; """ sql """ use `${mc_db}`; """ q01() q02() q03() + qQ01() + qQ02() + + // N-05: Query correctness should stay stable after altering catalog properties. + List<Map<String, Object>> alterCatalogQueryCases = [ + [ + "caseName": "row_count_small", + "properties": new LinkedHashMap<String, String>([ + "mc.split_strategy": "row_count", + "mc.split_row_count": "1024", + "mc.split_cross_partition": "false", + "mc.datetime_predicate_push_down": "false", + "mc.max_field_size_bytes": "4194304" + ]) + ], + [ + "caseName": "row_count_large", + "properties": new LinkedHashMap<String, String>([ + "mc.split_strategy": "row_count", + "mc.split_row_count": "4096", + "mc.split_cross_partition": "true", + "mc.datetime_predicate_push_down": "true", + "mc.max_field_size_bytes": "8388608" + ]) + ], + [ + "caseName": "byte_size_small", + "properties": new LinkedHashMap<String, String>([ + "mc.split_strategy": "byte_size", + "mc.split_byte_size": "134217728", + "mc.split_cross_partition": "false", + "mc.datetime_predicate_push_down": "false", + "mc.max_field_size_bytes": "16777216" + ]) + ], + [ + "caseName": "byte_size_large", + "properties": new LinkedHashMap<String, String>([ + "mc.split_strategy": "byte_size", + "mc.split_byte_size": "268435456", + "mc.split_cross_partition": "true", + "mc.datetime_predicate_push_down": "true", + "mc.max_field_size_bytes": "8388608" + ]) + ] + ] + alterCatalogQueryCases.each { Map<String, Object> testCase -> + Map<String, String> properties = testCase.properties as Map<String, String> + sql """switch internal""" + sql """ + alter catalog ${mc_alter_catalog_name} set properties ( + ${renderCatalogProperties(properties)} + ); + """ + assertShowCreateCatalogContains(mc_alter_catalog_name, + properties.collect { key, value -> "\"${key}\" = \"${value}\"" }) + sql """switch `${mc_alter_catalog_name}`;""" + sql """use `${mc_db}`;""" + qN05() + } // replay test sql """drop catalog if exists ${mc_catalog_name};""" @@ -386,10 +986,14 @@ suite("test_external_catalog_maxcompute", "p2,external,maxcompute,external_remot sql """ use `${mc_db}`; """ order_qt_replay_q6 """ select * from mc_parts where dt >= '2023-08-03' and mc_bigint > 1001 order by mc_bigint """ + // C-04: REFRESH CATALOG // test multi partitions prune sql """ refresh catalog ${mc_catalog_name} """ sql """ switch `${mc_catalog_name}`; """ sql """ use `${mc_db}`; """ + List<List<Object>> refreshedTables = sql """ show tables """ + assertTrue(refreshedTables.any { it[0].toString() == "web_site" }, + "table web_site should still be visible after refresh catalog") qt_multi_partition_q1 """ desc multi_partitions """ order_qt_multi_partition_q1 """ show partitions from multi_partitions; """ diff --git a/regression-test/suites/external_table_p2/maxcompute/test_external_catalog_maxcompute_fault_tolerance.groovy b/regression-test/suites/external_table_p2/maxcompute/test_external_catalog_maxcompute_fault_tolerance.groovy new file mode 100644 index 00000000000..64bd3111bc4 --- /dev/null +++ b/regression-test/suites/external_table_p2/maxcompute/test_external_catalog_maxcompute_fault_tolerance.groovy @@ -0,0 +1,253 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import java.util.concurrent.TimeUnit + +suite("test_external_catalog_maxcompute_fault_tolerance", "p2,external") { + // draft + return + String enabled = context.config.otherConfigs.get("enableMaxComputeTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("disable MaxCompute test.") + return + } + + String ak = context.config.otherConfigs.get("ak") + String sk = context.config.otherConfigs.get("sk") + + String baseEndpoint = "http://service.cn-beijing-vpc.maxcompute.aliyun-inc.com/api" + String connectFailureEndpoint = "http://203.0.113.1:81/api" + String queryProject = "mc_datalake" + String writeProject = "mc_doris_test_write" + + String baseCatalog = "test_external_mc_fault_tolerance" + String connectTimeoutCatalog = "${baseCatalog}_connect_timeout" + String readSetupCatalog = "${baseCatalog}_read_setup" + String readTimeoutCatalog = "${baseCatalog}_read_timeout" + String retryZeroCatalog = "${baseCatalog}_retry_zero" + String invalidCredentialCatalog = "${baseCatalog}_invalid_credentials" + + String largeReadDb = "mc_fault_tolerance_db" + String largeReadTable = "mc_fault_tolerance_large_read" + String internalDb = "mc_fault_tolerance_internal" + String internalTable = "mc_fault_tolerance_src" + + def baseCatalogProperties = { String project, String endpoint -> + new LinkedHashMap<String, String>([ + "type": "max_compute", + "mc.default.project": project, + "mc.access_key": ak, + "mc.secret_key": sk, + "mc.endpoint": endpoint + ]) + } + + def renderCatalogProperties = { Map<String, String> properties -> + properties.collect { key, value -> "\"${key}\" = \"${value}\"" }.join(",\n ") + } + + def createCatalog = { String catalogName, Map<String, String> properties -> + sql """switch internal""" + sql """drop catalog if exists ${catalogName}""" + sql """ + create catalog if not exists ${catalogName} properties ( + ${renderCatalogProperties(properties)} + ); + """ + } + + def switchToCatalogDb = { String catalogName, String dbName -> + sql """switch `${catalogName}`""" + sql """use `${dbName}`""" + } + + def expectFailureWithin = { String caseId, long maxElapsedMs, Closure action -> + long startMs = System.currentTimeMillis() + Throwable failure = null + try { + action.call() + } catch (Throwable t) { + failure = t + } + long elapsedMs = System.currentTimeMillis() - startMs + assertTrue(failure != null, "${caseId} should fail") + assertTrue(elapsedMs < maxElapsedMs, + "${caseId} should fail within ${maxElapsedMs} ms, actual ${elapsedMs} ms, message: ${failure.getMessage()}") + [failure, elapsedMs] + } + + def assertMessageContainsAny = { String caseId, Throwable failure, List<String> fragments -> + String message = failure.toString() + assertTrue(fragments.any { fragment -> message.toLowerCase().contains(fragment.toLowerCase()) }, + "${caseId} expected message containing one of ${fragments}, actual: ${message}") + } + + // F-01: Validate the short connect timeout path with a connect-family endpoint failure. + Map<String, String> connectTimeoutProperties = baseCatalogProperties(queryProject, connectFailureEndpoint) + connectTimeoutProperties.put("mc.connect_timeout", "1") + connectTimeoutProperties.put("mc.read_timeout", "1") + connectTimeoutProperties.put("mc.retry_count", "1") + createCatalog(connectTimeoutCatalog, connectTimeoutProperties) + def (connectFailure, connectElapsedMs) = expectFailureWithin( + "F-01", TimeUnit.SECONDS.toMillis(20)) { + sql """show databases from ${connectTimeoutCatalog}""" + } + assertMessageContainsAny("F-01", connectFailure, [ + "timed out", + "timeout", + "connection refused", + "no route to host", + "network is unreachable", + "endpoint" + ]) + logger.info("F-01 failed in ${connectElapsedMs} ms with message: ${connectFailure.getMessage()}") + + // F-02: Prepare a large MaxCompute table and validate the read-timeout path on the scan chain. + sql """CREATE DATABASE IF NOT EXISTS internal.${internalDb}""" + sql """DROP TABLE IF EXISTS internal.${internalDb}.${internalTable}""" + sql """ + CREATE TABLE internal.${internalDb}.${internalTable} ( + id INT, + name STRING, + val DOUBLE, + ds STRING + ) + DISTRIBUTED BY HASH(id) BUCKETS 8 + PROPERTIES ("replication_allocation" = "tag.location.default: 1") + """ + sql """ + INSERT INTO internal.${internalDb}.${internalTable} + SELECT + number AS id, + concat('name_', cast(number AS STRING)) AS name, + number * 0.01 AS val, + concat('2025010', cast((number % 8 + 1) AS STRING)) AS ds + FROM numbers("number"="2000000") + """ + + Map<String, String> readSetupProperties = baseCatalogProperties(writeProject, baseEndpoint) + readSetupProperties.put("mc.quota", "pay-as-you-go") + readSetupProperties.put("mc.enable.namespace.schema", "true") + createCatalog(readSetupCatalog, readSetupProperties) + + sql """switch `${readSetupCatalog}`""" + sql """drop database if exists ${largeReadDb}""" + sql """refresh catalog ${readSetupCatalog}""" + sql """create database ${largeReadDb}""" + switchToCatalogDb(readSetupCatalog, largeReadDb) + sql """drop table if exists ${largeReadTable}""" + sql """ + CREATE TABLE ${largeReadTable} ( + id INT, + name STRING, + val DOUBLE, + ds STRING + ) + """ + sql """INSERT INTO ${largeReadTable} SELECT * FROM internal.${internalDb}.${internalTable}""" + List<List<Object>> normalReadCount = sql """SELECT count(*) FROM ${largeReadTable}""" + assertEquals("2000000", normalReadCount[0][0].toString()) + + Map<String, String> readTimeoutProperties = baseCatalogProperties(writeProject, baseEndpoint) + readTimeoutProperties.put("mc.quota", "pay-as-you-go") + readTimeoutProperties.put("mc.enable.namespace.schema", "true") + readTimeoutProperties.put("mc.read_timeout", "1") + readTimeoutProperties.put("mc.retry_count", "1") + createCatalog(readTimeoutCatalog, readTimeoutProperties) + def (readFailure, readElapsedMs) = expectFailureWithin( + "F-02", TimeUnit.SECONDS.toMillis(30)) { + switchToCatalogDb(readTimeoutCatalog, largeReadDb) + sql """select count(*) from ${largeReadTable}""" + } + assertMessageContainsAny("F-02", readFailure, [ + "timed out", + "timeout", + "read timed out", + "sockettimeoutexception" + ]) + logger.info("F-02 failed in ${readElapsedMs} ms with message: ${readFailure.getMessage()}") + + // F-03: The current implementation rejects retry_count=0 at DDL validation time. + test { + sql """switch internal""" + sql """drop catalog if exists ${retryZeroCatalog}""" + sql """ + create catalog if not exists ${retryZeroCatalog} properties ( + "type" = "max_compute", + "mc.default.project" = "${queryProject}", + "mc.access_key" = "${ak}", + "mc.secret_key" = "${sk}", + "mc.endpoint" = "${baseEndpoint}", + "mc.retry_count" = "0" + ); + """ + exception "mc.retry_count must be greater than 0" + } + + // F-04: Cover the authentication failure path with invalid AK/SK credentials. + Map<String, String> invalidCredentialProperties = baseCatalogProperties(queryProject, baseEndpoint) + invalidCredentialProperties.put("mc.access_key", "invalid_ak_for_regression") + invalidCredentialProperties.put("mc.secret_key", "invalid_sk_for_regression") + createCatalog(invalidCredentialCatalog, invalidCredentialProperties) + def (authFailure, authElapsedMs) = expectFailureWithin( + "F-04", TimeUnit.SECONDS.toMillis(20)) { + sql """show databases from ${invalidCredentialCatalog}""" + } + assertMessageContainsAny("F-04", authFailure, [ + "invalid credentials", + "odps-0410051", + "accesskeyid not found", + "authentication" + ]) + logger.info("F-04 failed in ${authElapsedMs} ms with message: ${authFailure.getMessage()}") + + // F-05: Validate multi-session concurrent reads on the same MaxCompute catalog. + createCatalog(baseCatalog, baseCatalogProperties(queryProject, baseEndpoint)) + List<List<String>> concurrentResults = Collections.synchronizedList(new ArrayList<List<String>>()) + List<String> concurrentErrors = Collections.synchronizedList(new ArrayList<String>()) + List<Thread> queryThreads = [] + int sessionCount = 4 + (1..sessionCount).each { sessionId -> + queryThreads.add(Thread.start { + try { + List<String> sessionResult = connect(context.config.jdbcUser, + context.config.jdbcPassword, context.config.jdbcUrl) { + sql """switch `${baseCatalog}`""" + sql """use `${queryProject}`""" + List<List<Object>> webSiteCount = sql """select count(*) from web_site""" + List<List<Object>> partitionCount = + sql """select count(*) from mc_parts where dt >= '2023-08-03'""" + [ + webSiteCount[0][0].toString(), + partitionCount[0][0].toString() + ] + } + concurrentResults.add(sessionResult) + } catch (Throwable t) { + concurrentErrors.add("session ${sessionId}: ${t.getMessage()}") + } + }) + } + queryThreads.each { thread -> thread.join() } + assertTrue(concurrentErrors.isEmpty(), "F-05 session errors: ${concurrentErrors}") + assertEquals(sessionCount, concurrentResults.size()) + concurrentResults.each { result -> + assertEquals(["5", "3"], result) + } + + sql """switch internal""" +} diff --git a/regression-test/suites/external_table_p2/maxcompute/test_max_compute_create_table.groovy b/regression-test/suites/external_table_p2/maxcompute/test_max_compute_create_table.groovy index df8c37b2e62..31ef6286445 100644 --- a/regression-test/suites/external_table_p2/maxcompute/test_max_compute_create_table.groovy +++ b/regression-test/suites/external_table_p2/maxcompute/test_max_compute_create_table.groovy @@ -218,6 +218,27 @@ suite("test_max_compute_create_table", "p2,external,maxcompute,external_remote,e sql """show tables like '${test9_table}' """ qt_test9_show_create_table """SHOW CREATE TABLE ${test9_table} """ + // ============================================================================ + // Test 9.1: ERROR CASE - CREATE TABLE with duplicate table name + // ============================================================================ + String test9_1_table = "test_mc_duplicate_table_name" + sql """DROP TABLE IF EXISTS ${test9_1_table}""" + sql """ + CREATE TABLE ${test9_1_table} ( + id INT, + name STRING + ) + """ + test { + sql """ + CREATE TABLE ${test9_1_table} ( + id INT, + name STRING + ) + """ + exception "already exists" + } + // ============================================================================ // Test 10: CREATE TABLE with ARRAY type (supported by MaxCompute) // ============================================================================ @@ -266,18 +287,32 @@ suite("test_max_compute_create_table", "p2,external,maxcompute,external_remote,e qt_test12_show_create_table """SHOW CREATE TABLE ${test12_table} """ // ============================================================================ - // Test 13: ERROR CASE - CREATE TABLE with unsupported type (IPV4) - // ============================================================================ - String test13_table = "test_mc_unsupported_type_table" - sql """DROP TABLE IF EXISTS ${test13_table}""" - test { - sql """ - CREATE TABLE ${test13_table} ( - id INT, - ip IPV4 - ) - """ - exception "Unsupported" + // Test 13: ERROR CASE - CREATE TABLE with Doris-supported but MaxCompute-unsupported types + // ============================================================================ + List<Map<String, String>> unsupportedTypeCases = [ + [suffix: "ipv4", type: "IPV4", exceptionMsg: "Unsupported Doris type for MaxCompute"], + [suffix: "ipv6", type: "IPV6", exceptionMsg: "Unsupported Doris type for MaxCompute"], + [suffix: "largeint", type: "LARGEINT", exceptionMsg: "Unsupported Doris type for MaxCompute"], + [suffix: "hll", type: "HLL", exceptionMsg: "Key column can not set complex type"], + [suffix: "bitmap", type: "BITMAP", exceptionMsg: "Key column can not set complex type"], + [suffix: "quantile_state", type: "QUANTILE_STATE", exceptionMsg: "Key column can not set complex type"], + [suffix: "agg_state", type: "AGG_STATE<sum(INT)>", + exceptionMsg: "Aggregation columns are not supported for MaxCompute tables"], + [suffix: "json", type: "JSON", exceptionMsg: "Unsupported Doris type for MaxCompute: JSON"], + [suffix: "variant", type: "VARIANT", exceptionMsg: "Unsupported Doris type for MaxCompute: VARIANT"] + ] + unsupportedTypeCases.each { testCase -> + String test13Table = "test_mc_unsupported_type_${testCase.suffix}" + sql """DROP TABLE IF EXISTS ${test13Table}""" + test { + sql """ + CREATE TABLE ${test13Table} ( + id INT, + unsupported_col ${testCase.type} + ) + """ + exception "${testCase.exceptionMsg}" + } } // ============================================================================ @@ -388,6 +423,11 @@ suite("test_max_compute_create_table", "p2,external,maxcompute,external_remote,e ) """ def schema = sql """SHOW CREATE TABLE ${test25_table}""" + String recreatedSchema = schema.collect { row -> + row.collect { it == null ? "" : it.toString() }.join(" ") + }.join("\n") + assertTrue(recreatedSchema.toLowerCase().contains("name") && recreatedSchema.toUpperCase().contains("DOUBLE"), + "Recreated table should expose the new schema") // ============================================================================ // Test 29: DROP TABLE without IF EXISTS (non-existent) diff --git a/regression-test/suites/external_table_p2/maxcompute/test_mc_join.groovy b/regression-test/suites/external_table_p2/maxcompute/test_mc_join.groovy new file mode 100644 index 00000000000..31e31b41a75 --- /dev/null +++ b/regression-test/suites/external_table_p2/maxcompute/test_mc_join.groovy @@ -0,0 +1,183 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_mc_join", "p2,external,maxcompute") { + String enabled = context.config.otherConfigs.get("enableMaxComputeTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("disable MaxCompute test.") + return + } + + boolean hiveEnabled = "true".equalsIgnoreCase(context.config.otherConfigs.get("enableHiveTest")) + boolean icebergEnabled = "true".equalsIgnoreCase(context.config.otherConfigs.get("enableIcebergTest")) + + String ak = context.config.otherConfigs.get("ak") + String sk = context.config.otherConfigs.get("sk") + String mcCatalog = "test_mc_join_mc" + String otherMcCatalog = "test_mc_join_mc_other" + String internalDb = "test_mc_join_internal_db" + String internalTb = "test_mc_join_internal_tbl" + + sql """drop catalog if exists ${mcCatalog}""" + sql """drop catalog if exists ${otherMcCatalog}""" + sql """ + create catalog if not exists ${mcCatalog} properties ( + "type" = "max_compute", + "mc.default.project" = "mc_datalake_schema", + "mc.access_key" = "${ak}", + "mc.secret_key" = "${sk}", + "mc.endpoint" = "http://service.cn-beijing-vpc.maxcompute.aliyun-inc.com/api", + "mc.enable.namespace.schema" = "true" + ); + """ + sql """ + create catalog if not exists ${otherMcCatalog} properties ( + "type" = "max_compute", + "mc.default.project" = "other_mc_datalake_test", + "mc.access_key" = "${ak}", + "mc.secret_key" = "${sk}", + "mc.endpoint" = "http://service.cn-beijing-vpc.maxcompute.aliyun-inc.com/api" + ); + """ + + sql """create database if not exists internal.${internalDb}""" + sql """drop table if exists internal.${internalDb}.${internalTb}""" + sql """ + create table internal.${internalDb}.${internalTb} ( + id int, + dept string + ) + distributed by hash(id) buckets 1 + properties ("replication_allocation" = "tag.location.default: 1") + """ + sql """ + insert into internal.${internalDb}.${internalTb} values + (1, 'finance'), + (2, 'it'), + (4, 'ops') + """ + + // J-01: Join a MaxCompute table with an internal OLAP table. + order_qt_j01_mc_join_internal """ + select u.id, u.name, i.dept + from ${mcCatalog}.`default`.user_info u + join internal.${internalDb}.${internalTb} i + on u.id = i.id + order by u.id + """ + + if (hiveEnabled) { + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String hmsPort = context.config.otherConfigs.get("hive3HmsPort") + String hdfsPort = context.config.otherConfigs.get("hive3HdfsPort") + String hiveCatalog = "test_mc_join_external_hive" + String hiveDb = "test_mc_join_hive_db" + String hiveTb = "test_mc_join_hive_tbl" + + sql """drop catalog if exists ${hiveCatalog}""" + sql """ + create catalog if not exists ${hiveCatalog} properties ( + "type" = "hms", + "hive.metastore.uris" = "thrift://${externalEnvIp}:${hmsPort}", + "fs.defaultFS" = "hdfs://${externalEnvIp}:${hdfsPort}" + ); + """ + + sql """switch ${hiveCatalog}""" + sql """create database if not exists ${hiveDb}""" + sql """use ${hiveDb}""" + sql """drop table if exists ${hiveTb}""" + sql """ + create table ${hiveTb} ( + id int, + city string, + segment string + ) + """ + sql """ + insert into ${hiveTb} values + (1, 'Beijing', 'north'), + (2, 'Shanghai', 'east'), + (8, 'Hangzhou', 'west') + """ + + // J-02: Join a Hive table with a MaxCompute table. + order_qt_j02_mc_join_hive """ + select u.id, u.name, h.segment + from ${mcCatalog}.`default`.user_info u + join ${hiveCatalog}.${hiveDb}.${hiveTb} h + on u.id = h.id and u.city = h.city + order by u.id + """ + } else { + logger.info("skip Hive join case because enableHiveTest is false") + } + + if (icebergEnabled) { + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port") + String minio_port = context.config.otherConfigs.get("iceberg_minio_port") + String icebergCatalog = "test_mc_join_external_iceberg" + String icebergDb = "test_mc_join_iceberg_db" + String icebergTb = "test_mc_join_iceberg_tbl" + + sql """drop catalog if exists ${icebergCatalog}""" + sql """ + create catalog if not exists ${icebergCatalog} properties ( + "type" = "iceberg", + "iceberg.catalog.type" = "rest", + "uri" = "http://${externalEnvIp}:${rest_port}", + "s3.access_key" = "admin", + "s3.secret_key" = "password", + "s3.endpoint" = "http://${externalEnvIp}:${minio_port}", + "s3.region" = "us-east-1" + ); + """ + + sql """switch ${icebergCatalog}""" + sql """create database if not exists ${icebergDb}""" + sql """use ${icebergDb}""" + sql """drop table if exists ${icebergTb}""" + sql """create table ${icebergTb} (id int, tier string)""" + sql """ + insert into ${icebergTb} values + (1, 'gold'), + (3, 'silver'), + (9, 'bronze') + """ + + // J-03: Join an Iceberg table with a MaxCompute table. + order_qt_j02_mc_join_iceberg """ + select u.id, u.name, i.tier + from ${mcCatalog}.`default`.user_info u + join ${icebergCatalog}.${icebergDb}.${icebergTb} i + on u.id = i.id + order by u.id + """ + } else { + logger.info("skip Iceberg join case because enableIcebergTest is false") + } + + // J-04: Join tables across two MaxCompute catalogs from different projects. + order_qt_j03_mc_join_cross_catalog """ + select u.id, u.name, o.col + from ${mcCatalog}.`default`.user_info u + join ${otherMcCatalog}.other_mc_datalake_test.other_db_mc_tb o + on u.id = o.id + order by u.id + """ +} diff --git a/regression-test/suites/external_table_p2/maxcompute/write/test_mc_write_ctas.groovy b/regression-test/suites/external_table_p2/maxcompute/write/test_mc_write_ctas.groovy index 186fa3f87e7..83e1dd75fce 100644 --- a/regression-test/suites/external_table_p2/maxcompute/write/test_mc_write_ctas.groovy +++ b/regression-test/suites/external_table_p2/maxcompute/write/test_mc_write_ctas.groovy @@ -100,7 +100,9 @@ suite("test_mc_write_ctas", "p2,external,maxcompute,external_remote,external_rem sql """CREATE TABLE ${tb5} AS SELECT count(*) AS cnt, sum(val) AS total FROM ${src}""" order_qt_ctas_agg """ SELECT * FROM ${tb5} """ - // Test 6: CTAS from internal catalog (cross-catalog) + // Test 6: CTAS from catalog (cross-catalog) + String internal_db = "mc_ctas_internal_${uuid}" + String internal_tb = "ctas_internal_src_${uuid}" sql """CREATE DATABASE IF NOT EXISTS internal.${internal_db}""" sql """DROP TABLE IF EXISTS internal.${internal_db}.${internal_tb}""" sql """ diff --git a/regression-test/suites/external_table_p2/maxcompute/write/test_mc_write_insert.groovy b/regression-test/suites/external_table_p2/maxcompute/write/test_mc_write_insert.groovy index 031918add73..e6c18105240 100644 --- a/regression-test/suites/external_table_p2/maxcompute/write/test_mc_write_insert.groovy +++ b/regression-test/suites/external_table_p2/maxcompute/write/test_mc_write_insert.groovy @@ -44,11 +44,41 @@ suite("test_mc_write_insert", "p2,external,maxcompute,external_remote,external_r def uuid = UUID.randomUUID().toString().replace("-", "").substring(0, 8) String db = "mc_write_insert_${uuid}" + String internal_db = "mc_write_internal_${uuid}" + String internal_tb = "insert_src_${uuid}" + String unsupported_schema = "default" + String clustered_tb = "mc_write_unsupported_clustered" + String transactional_tb = "mc_write_unsupported_transactional" + String delta_tb = "mc_write_unsupported_delta" + String external_tb = "mc_write_unsupported_external" sql """drop database if exists ${db}""" sql """create database ${db}""" sql """use ${db}""" + def getProfileTextBySql = { String stmt -> + def profileAction = new org.apache.doris.regression.action.ProfileAction(context) + String profileId = "" + int attempts = 0 + while (attempts < 10 && (profileId == null || profileId.isEmpty())) { + List profileData = profileAction.getProfileList() + for (def profileItem : profileData) { + if (profileItem["Sql Statement"].toString().contains(stmt)) { + profileId = profileItem["Profile ID"].toString() + break + } + } + if (profileId == null || profileId.isEmpty()) { + Thread.sleep(300) + } + attempts++ + } + assertTrue(profileId != null && !profileId.isEmpty(), + "Profile ID of ${stmt} is not found") + Thread.sleep(500) + return profileAction.getProfile(profileId).toString() + } + try { // Test 1: Basic INSERT INTO with VALUES String tb1 = "basic_insert_${uuid}" @@ -108,7 +138,108 @@ suite("test_mc_write_insert", "p2,external,maxcompute,external_remote,external_r sql """INSERT INTO ${tb4} VALUES (2, 'batch2')""" sql """INSERT INTO ${tb4} VALUES (3, 'batch3')""" order_qt_multi_batch """ SELECT * FROM ${tb4} """ + + // Test 5: INSERT INTO MC table from internal catalog table + sql """CREATE DATABASE IF NOT EXISTS internal.${internal_db}""" + sql """DROP TABLE IF EXISTS internal.${internal_db}.${internal_tb}""" + sql """ + CREATE TABLE internal.${internal_db}.${internal_tb} ( + id INT, + name STRING, + value DOUBLE + ) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES ("replication_allocation" = "tag.location.default: 1") + """ + sql """ + INSERT INTO internal.${internal_db}.${internal_tb} VALUES + (11, 'internal_alice', 110.5), + (12, 'internal_bob', 120.3), + (13, 'internal_charlie', 130.7) + """ + + String tb5 = "cross_catalog_insert_${uuid}" + sql """DROP TABLE IF EXISTS ${tb5}""" + sql """ + CREATE TABLE ${tb5} ( + id INT, + name STRING, + value DOUBLE + ) + """ + sql """INSERT INTO ${tb5} SELECT * FROM internal.${internal_db}.${internal_tb}""" + order_qt_cross_catalog_insert """ SELECT * FROM ${tb5} """ + + // Test 5b: INSERT INTO SELECT with LIMIT should keep serial read behavior + String limit_src = "limit_src_${uuid}" + sql """DROP TABLE IF EXISTS internal.${internal_db}.${limit_src}""" + sql """ + CREATE TABLE internal.${internal_db}.${limit_src} ( + id INT, + name STRING + ) + DISTRIBUTED BY HASH(id) BUCKETS 5 + PROPERTIES ("replication_allocation" = "tag.location.default: 1") + """ + sql """ + INSERT INTO internal.${internal_db}.${limit_src} + SELECT + number + 1 AS id, + concat('limit_', cast(number + 1 AS STRING)) AS name + FROM numbers("number"="200") + """ + + String tb_limit = "limit_insert_${uuid}" + sql """DROP TABLE IF EXISTS ${tb_limit}""" + sql """ + CREATE TABLE ${tb_limit} ( + id INT, + name STRING + ) + """ + String limitInsertSql = "INSERT INTO ${tb_limit} SELECT id, name " + + "FROM internal.${internal_db}.${limit_src} ORDER BY id LIMIT 100" + sql """set enable_profile=true""" + sql """set profile_level=2""" + sql """set parallel_pipeline_task_num=1""" + sql limitInsertSql + qt_limit_insert_count """ SELECT count(*) FROM ${tb_limit} """ + order_qt_limit_insert_top10 """ SELECT * FROM ${tb_limit} ORDER BY id LIMIT 10 """ + String limitInsertProfile = getProfileTextBySql(limitInsertSql) + assertTrue(limitInsertProfile.contains("MaxScanConcurrency: 1"), + "LIMIT insert should use serial read, profile: ${limitInsertProfile}") + sql """set enable_profile=false""" + + sql """USE ${mc_catalog_name}.`${unsupported_schema}`""" + // Test 6: INSERT INTO clustered MaxCompute table should fail + sql """DESC ${clustered_tb}""" + test { + sql """INSERT OVERWRITE TABLE ${clustered_tb} VALUES (21, 'clustered_row', 210.5)""" + exception "Writing cluster table is not supported yet" + } + // Test 7: INSERT INTO transactional MaxCompute table should fail + sql """DESC ${transactional_tb}""" + test { + sql """INSERT INTO ${transactional_tb} VALUES (22, 'transactional_row', 220.5)""" + exception "not supported by storage api" + } + + // Test 8: INSERT INTO Delta MaxCompute table should fail + sql """DESC ${delta_tb}""" + test { + sql """INSERT INTO ${delta_tb} VALUES (23, 'delta_row', 230.5)""" + exception "not supported by storage api" + } + + // Test 9: INSERT INTO Delta Lake external MaxCompute table should fail + sql """DESC ${external_tb}""" + test { + sql """INSERT INTO ${external_tb} VALUES (24, 'external_row', 240.5)""" + exception "mc_write_unsupported_external: No oss endpoint provide" + } } finally { + sql """DROP TABLE IF EXISTS internal.${internal_db}.${internal_tb}""" + sql """DROP DATABASE IF EXISTS internal.${internal_db}""" sql """drop database if exists ${mc_catalog_name}.${db}""" } } diff --git a/regression-test/suites/external_table_p2/maxcompute/write/test_mc_write_large_data.groovy b/regression-test/suites/external_table_p2/maxcompute/write/test_mc_write_large_data.groovy index 2d6e89c594a..66f035f4552 100644 --- a/regression-test/suites/external_table_p2/maxcompute/write/test_mc_write_large_data.groovy +++ b/regression-test/suites/external_table_p2/maxcompute/write/test_mc_write_large_data.groovy @@ -135,6 +135,21 @@ suite("test_mc_write_large_data", "p2,external,maxcompute,external_remote,extern qt_count_multi_part """ SELECT count(*) FROM ${tb3} """ order_qt_top10_multi_part """ SELECT * FROM ${tb3} ORDER BY id LIMIT 10 """ + + // Test 3b: Oversized field should fail with the default 8 MB field limit + String tb4 = "oversize_field_${uuid}" + sql """DROP TABLE IF EXISTS ${tb4}""" + sql """ + CREATE TABLE ${tb4} ( + id INT, + payload STRING + ) + """ + int overDefaultFieldSize = 8 * 1024 * 1024 + 1 + test { + sql """INSERT INTO ${tb4} SELECT 1, repeat('x', ${overDefaultFieldSize})""" + exception "ODPS-0020041" + } } finally { sql """drop database if exists ${mc_catalog_name}.${db}""" sql """DROP TABLE IF EXISTS internal.${internal_db}.${internal_tb}""" diff --git a/regression-test/suites/external_table_p2/maxcompute/write/test_mc_write_static_partitions.groovy b/regression-test/suites/external_table_p2/maxcompute/write/test_mc_write_static_partitions.groovy index f2807148398..48c15074a9e 100644 --- a/regression-test/suites/external_table_p2/maxcompute/write/test_mc_write_static_partitions.groovy +++ b/regression-test/suites/external_table_p2/maxcompute/write/test_mc_write_static_partitions.groovy @@ -176,6 +176,27 @@ suite("test_mc_write_static_partitions", "p2,external,maxcompute,external_remote contains "SORT" } + // Test 5b: Dynamic partition INSERT should fail without strict consistency + String tb5_strict = "dynamic_strict_${uuid}" + sql """DROP TABLE IF EXISTS ${tb5_strict}""" + sql """ + CREATE TABLE ${tb5_strict} ( + id INT, + name STRING, + ds STRING + ) PARTITION BY (ds)() + """ + sql """set enable_strict_consistency_dml=false""" + explain { + sql("INSERT INTO ${tb5_strict} VALUES (1, 'a', '20250101'), (2, 'b', '20250102'), (3, 'c', '20250101')") + notContains "SORT" + } + test { + sql """INSERT INTO ${tb5_strict} VALUES (1, 'a', '20250101'), (2, 'b', '20250102'), (3, 'c', '20250101')""" + exception "writer has been closed" + } + sql """set enable_strict_consistency_dml=true""" + // Test 6: INSERT OVERWRITE non-partitioned table String tb6 = "overwrite_nopart_${uuid}" sql """DROP TABLE IF EXISTS ${tb6}""" --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
