morningman commented on code in PR #59432: URL: https://github.com/apache/doris/pull/59432#discussion_r2802596812
########## fe/fe-core/src/main/java/org/apache/doris/datasource/fluss/FlussExternalCatalogFactory.java: ########## @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.fluss; + +import org.apache.doris.common.DdlException; +import org.apache.doris.datasource.ExternalCatalog; + +import java.util.Map; + +public class FlussExternalCatalogFactory { Review Comment: I think this factory is unnecessary? ########## .github/workflows/fluss-integration.yml: ########## @@ -0,0 +1,270 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: Fluss Integration Tests Review Comment: We don't run e2e test like this. You can refer to `docker/thirdparties/`. see how we test with Paimon. Shortly speaking, you need to build a docker env only for fluss, and run e2e test written in `regression-test/suites/external_table_p0/` ########## fe/fe-core/src/main/java/org/apache/doris/datasource/fluss/FlussMetadataOps.java: ########## @@ -0,0 +1,495 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.fluss; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.catalog.Type; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.UserException; +import org.apache.doris.datasource.ExternalTable; +import org.apache.doris.datasource.operations.ExternalMetadataOps; +import org.apache.doris.nereids.trees.plans.commands.info.CreateOrReplaceBranchInfo; +import org.apache.doris.nereids.trees.plans.commands.info.CreateOrReplaceTagInfo; +import org.apache.doris.nereids.trees.plans.commands.info.CreateTableInfo; +import org.apache.doris.nereids.trees.plans.commands.info.DropBranchInfo; +import org.apache.doris.nereids.trees.plans.commands.info.DropTagInfo; + +import org.apache.fluss.client.Connection; +import org.apache.fluss.client.ConnectionFactory; +import org.apache.fluss.client.admin.Admin; +import org.apache.fluss.config.Configuration; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Supplier; + +public class FlussMetadataOps implements ExternalMetadataOps { + private static final Logger LOG = LogManager.getLogger(FlussMetadataOps.class); + + private static final int MAX_RETRY_ATTEMPTS = 3; + private static final long INITIAL_RETRY_DELAY_MS = 100; + private static final long MAX_RETRY_DELAY_MS = 5000; + + private final FlussExternalCatalog catalog; + private final String bootstrapServers; + + private final Map<String, FlussExternalTable.FlussTableMetadata> tableMetadataCache; + private final Map<String, List<String>> databaseTablesCache; + private final ReadWriteLock cacheLock = new ReentrantReadWriteLock(); + + private volatile Connection connection; + private volatile Admin admin; + private final AtomicBoolean initialized = new AtomicBoolean(false); + private final AtomicBoolean closed = new AtomicBoolean(false); + private final Object connectionLock = new Object(); + + public FlussMetadataOps(FlussExternalCatalog catalog) { Review Comment: This constructor is not used ########## fe/fe-core/src/main/java/org/apache/doris/datasource/fluss/FlussExternalCatalog.java: ########## @@ -0,0 +1,232 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.fluss; + +import org.apache.doris.common.DdlException; +import org.apache.doris.common.ThreadPoolManager; +import org.apache.doris.common.UserException; +import org.apache.doris.datasource.CatalogProperty; +import org.apache.doris.datasource.ExternalCatalog; +import org.apache.doris.datasource.InitCatalogLog; +import org.apache.doris.datasource.SessionContext; +import org.apache.doris.datasource.operations.ExternalMetadataOperations; +import org.apache.doris.transaction.TransactionManagerFactory; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.fluss.client.Connection; +import org.apache.fluss.client.ConnectionFactory; +import org.apache.fluss.client.admin.Admin; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.exception.TableNotExistException; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +public class FlussExternalCatalog extends ExternalCatalog { + private static final Logger LOG = LogManager.getLogger(FlussExternalCatalog.class); + + public static final String FLUSS_COORDINATOR_URI = "fluss.coordinator.uri"; + public static final String FLUSS_BOOTSTRAP_SERVERS = "bootstrap.servers"; + public static final String FLUSS_SECURITY_PROTOCOL = "fluss.security.protocol"; + public static final String FLUSS_SASL_MECHANISM = "fluss.sasl.mechanism"; + public static final String FLUSS_SASL_USERNAME = "fluss.sasl.username"; + public static final String FLUSS_SASL_PASSWORD = "fluss.sasl.password"; + public static final String FLUSS_ENABLE_MAPPING_VARBINARY = "fluss.enable.mapping.varbinary"; + public static final String FLUSS_TABLE_META_CACHE_TTL_SECOND = "fluss.table.meta.cache.ttl.second"; + + protected Connection flussConnection; + protected Admin flussAdmin; + + public FlussExternalCatalog(long catalogId, String name, String resource, Map<String, String> props, + String comment) { + super(catalogId, name, InitCatalogLog.Type.FLUSS, comment); + this.catalogProperty = new CatalogProperty(resource, props); + } + + @Override + public void checkProperties() throws DdlException { + super.checkProperties(); + String coordinatorUri = catalogProperty.getOrDefault(FLUSS_COORDINATOR_URI, null); + String bootstrapServers = catalogProperty.getOrDefault(FLUSS_BOOTSTRAP_SERVERS, null); + if (StringUtils.isEmpty(coordinatorUri) && StringUtils.isEmpty(bootstrapServers)) { + throw new DdlException("Missing required property: " + FLUSS_COORDINATOR_URI + + " or " + FLUSS_BOOTSTRAP_SERVERS); + } + } + + @Override + protected void initLocalObjectsImpl() { + Configuration conf = createFlussConfiguration(); + flussConnection = ConnectionFactory.createConnection(conf); + flussAdmin = flussConnection.getAdmin(); + initPreExecutionAuthenticator(); + FlussMetadataOps ops = ExternalMetadataOperations.newFlussMetadataOps(this, flussConnection); + threadPoolWithPreAuth = ThreadPoolManager.newDaemonFixedThreadPoolWithPreAuth( Review Comment: No need to init this thread pool, in case you really use it in Fluss Catalog ########## fe/fe-core/src/main/java/org/apache/doris/datasource/fluss/FlussMetadataOps.java: ########## @@ -0,0 +1,495 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.fluss; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.catalog.Type; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.UserException; +import org.apache.doris.datasource.ExternalTable; +import org.apache.doris.datasource.operations.ExternalMetadataOps; +import org.apache.doris.nereids.trees.plans.commands.info.CreateOrReplaceBranchInfo; +import org.apache.doris.nereids.trees.plans.commands.info.CreateOrReplaceTagInfo; +import org.apache.doris.nereids.trees.plans.commands.info.CreateTableInfo; +import org.apache.doris.nereids.trees.plans.commands.info.DropBranchInfo; +import org.apache.doris.nereids.trees.plans.commands.info.DropTagInfo; + +import org.apache.fluss.client.Connection; +import org.apache.fluss.client.ConnectionFactory; +import org.apache.fluss.client.admin.Admin; +import org.apache.fluss.config.Configuration; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Supplier; + +public class FlussMetadataOps implements ExternalMetadataOps { + private static final Logger LOG = LogManager.getLogger(FlussMetadataOps.class); + + private static final int MAX_RETRY_ATTEMPTS = 3; + private static final long INITIAL_RETRY_DELAY_MS = 100; + private static final long MAX_RETRY_DELAY_MS = 5000; + + private final FlussExternalCatalog catalog; + private final String bootstrapServers; + + private final Map<String, FlussExternalTable.FlussTableMetadata> tableMetadataCache; + private final Map<String, List<String>> databaseTablesCache; + private final ReadWriteLock cacheLock = new ReentrantReadWriteLock(); + + private volatile Connection connection; + private volatile Admin admin; + private final AtomicBoolean initialized = new AtomicBoolean(false); + private final AtomicBoolean closed = new AtomicBoolean(false); + private final Object connectionLock = new Object(); + + public FlussMetadataOps(FlussExternalCatalog catalog) { + this.catalog = catalog; + this.bootstrapServers = catalog.getBootstrapServers(); + this.tableMetadataCache = new HashMap<>(); + this.databaseTablesCache = new HashMap<>(); + } + + public FlussMetadataOps(org.apache.doris.datasource.ExternalCatalog catalog, Connection connection) { Review Comment: ```suggestion public FlussMetadataOps(FlussExternalCatalog catalog, Connection connection) { ``` ########## fe/fe-core/src/main/java/org/apache/doris/datasource/fluss/FlussExternalCatalog.java: ########## @@ -0,0 +1,232 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.fluss; + +import org.apache.doris.common.DdlException; +import org.apache.doris.common.ThreadPoolManager; +import org.apache.doris.common.UserException; +import org.apache.doris.datasource.CatalogProperty; +import org.apache.doris.datasource.ExternalCatalog; +import org.apache.doris.datasource.InitCatalogLog; +import org.apache.doris.datasource.SessionContext; +import org.apache.doris.datasource.operations.ExternalMetadataOperations; +import org.apache.doris.transaction.TransactionManagerFactory; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.fluss.client.Connection; +import org.apache.fluss.client.ConnectionFactory; +import org.apache.fluss.client.admin.Admin; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.exception.TableNotExistException; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +public class FlussExternalCatalog extends ExternalCatalog { + private static final Logger LOG = LogManager.getLogger(FlussExternalCatalog.class); + + public static final String FLUSS_COORDINATOR_URI = "fluss.coordinator.uri"; Review Comment: You need to define a `FlussProperties` class, extends from `MetastoreProperties`. And all parsing logic should in `FlussProperties` ########## fe/fe-core/src/main/java/org/apache/doris/datasource/fluss/FlussMetadataOps.java: ########## @@ -0,0 +1,495 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.fluss; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.catalog.Type; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.UserException; +import org.apache.doris.datasource.ExternalTable; +import org.apache.doris.datasource.operations.ExternalMetadataOps; +import org.apache.doris.nereids.trees.plans.commands.info.CreateOrReplaceBranchInfo; +import org.apache.doris.nereids.trees.plans.commands.info.CreateOrReplaceTagInfo; +import org.apache.doris.nereids.trees.plans.commands.info.CreateTableInfo; +import org.apache.doris.nereids.trees.plans.commands.info.DropBranchInfo; +import org.apache.doris.nereids.trees.plans.commands.info.DropTagInfo; + +import org.apache.fluss.client.Connection; +import org.apache.fluss.client.ConnectionFactory; +import org.apache.fluss.client.admin.Admin; +import org.apache.fluss.config.Configuration; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Supplier; + +public class FlussMetadataOps implements ExternalMetadataOps { + private static final Logger LOG = LogManager.getLogger(FlussMetadataOps.class); + + private static final int MAX_RETRY_ATTEMPTS = 3; + private static final long INITIAL_RETRY_DELAY_MS = 100; + private static final long MAX_RETRY_DELAY_MS = 5000; + + private final FlussExternalCatalog catalog; + private final String bootstrapServers; + + private final Map<String, FlussExternalTable.FlussTableMetadata> tableMetadataCache; Review Comment: You can refer to `fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/AbstractIcebergProperties.java` to see how cache framework work. Or maybe we can simply remove this cache in this MVP version, and add it later -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
