This is an automated email from the ASF dual-hosted git repository. ron pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 29736b8c01924b7da03d4bcbfd9c812a8e5a08b4 Author: fengli <ldliu...@163.com> AuthorDate: Mon May 6 20:24:16 2024 +0800 [FLINK-35195][table] Support execute CreateMaterializedTableOperation for continuous refresh mode in SqlGateway --- flink-table/flink-sql-gateway/pom.xml | 6 + .../MaterializedTableManager.java | 182 ++++++++++++++ .../service/operation/OperationExecutor.java | 25 +- .../service/MaterializedTableStatementITCase.java | 274 +++++++++++++++++++++ 4 files changed, 483 insertions(+), 4 deletions(-) diff --git a/flink-table/flink-sql-gateway/pom.xml b/flink-table/flink-sql-gateway/pom.xml index 1a50d665a18..61f1e75942e 100644 --- a/flink-table/flink-sql-gateway/pom.xml +++ b/flink-table/flink-sql-gateway/pom.xml @@ -127,6 +127,12 @@ <type>test-jar</type> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-filesystem-test-utils</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> </dependencies> <build> diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java new file mode 100644 index 00000000000..fed60634a3a --- /dev/null +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.service.materializedtable; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.catalog.CatalogMaterializedTable; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.TableChange; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.gateway.api.operation.OperationHandle; +import org.apache.flink.table.gateway.api.results.ResultSet; +import org.apache.flink.table.gateway.service.operation.OperationExecutor; +import org.apache.flink.table.gateway.service.result.ResultFetcher; +import org.apache.flink.table.gateway.service.utils.SqlExecutionException; +import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableChangeOperation; +import org.apache.flink.table.operations.materializedtable.CreateMaterializedTableOperation; +import org.apache.flink.table.operations.materializedtable.DropMaterializedTableOperation; +import org.apache.flink.table.operations.materializedtable.MaterializedTableOperation; +import org.apache.flink.table.refresh.ContinuousRefreshHandler; +import org.apache.flink.table.refresh.ContinuousRefreshHandlerSerializer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.api.common.RuntimeExecutionMode.STREAMING; +import static org.apache.flink.configuration.DeploymentOptions.TARGET; +import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE; +import static org.apache.flink.configuration.PipelineOptions.NAME; +import static org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL; +import static org.apache.flink.table.api.internal.TableResultInternal.TABLE_RESULT_OK; + +/** Manager is responsible for execute the {@link MaterializedTableOperation}. */ +@Internal +public class MaterializedTableManager { + + private static final Logger LOG = LoggerFactory.getLogger(MaterializedTableManager.class); + + public static ResultFetcher callMaterializedTableOperation( + OperationExecutor operationExecutor, + OperationHandle handle, + MaterializedTableOperation op, + String statement) { + if (op instanceof CreateMaterializedTableOperation) { + return callCreateMaterializedTableOperation( + operationExecutor, handle, (CreateMaterializedTableOperation) op); + } + throw new SqlExecutionException( + String.format( + "Unsupported Operation %s for materialized table.", op.asSummaryString())); + } + + private static ResultFetcher callCreateMaterializedTableOperation( + OperationExecutor operationExecutor, + OperationHandle handle, + CreateMaterializedTableOperation createMaterializedTableOperation) { + CatalogMaterializedTable materializedTable = + createMaterializedTableOperation.getCatalogMaterializedTable(); + if (CatalogMaterializedTable.RefreshMode.CONTINUOUS == materializedTable.getRefreshMode()) { + createMaterializedInContinuousMode( + operationExecutor, handle, createMaterializedTableOperation); + } else { + throw new SqlExecutionException( + "Only support create materialized table in continuous refresh mode currently."); + } + // Just return ok for unify different refresh job info of continuous and full mode, user + // should get the refresh job info via desc table. + return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false); + } + + private static void createMaterializedInContinuousMode( + OperationExecutor operationExecutor, + OperationHandle handle, + CreateMaterializedTableOperation createMaterializedTableOperation) { + // create materialized table first + operationExecutor.callExecutableOperation(handle, createMaterializedTableOperation); + + ObjectIdentifier materializedTableIdentifier = + createMaterializedTableOperation.getTableIdentifier(); + CatalogMaterializedTable catalogMaterializedTable = + createMaterializedTableOperation.getCatalogMaterializedTable(); + + // Set job name, runtime mode, checkpoint interval + // TODO: Set minibatch related optimization options. + Configuration customConfig = new Configuration(); + String jobName = + String.format( + "Materialized_table_%s_continuous_refresh_job", + materializedTableIdentifier.asSerializableString()); + customConfig.set(NAME, jobName); + customConfig.set(RUNTIME_MODE, STREAMING); + customConfig.set(CHECKPOINTING_INTERVAL, catalogMaterializedTable.getFreshness()); + + String insertStatement = + String.format( + "INSERT INTO %s %s", + materializedTableIdentifier, catalogMaterializedTable.getDefinitionQuery()); + try { + // submit flink streaming job + ResultFetcher resultFetcher = + operationExecutor.executeStatement(handle, insertStatement); + + // get execution.target and jobId, currently doesn't support yarn and k8s, so doesn't + // get clusterId + List<RowData> results = fetchAllResults(resultFetcher); + String jobId = results.get(0).getString(0).toString(); + String executeTarget = + operationExecutor.getSessionContext().getSessionConf().get(TARGET); + ContinuousRefreshHandler continuousRefreshHandler = + new ContinuousRefreshHandler(executeTarget, jobId); + byte[] serializedBytes = + ContinuousRefreshHandlerSerializer.INSTANCE.serialize(continuousRefreshHandler); + + // update RefreshHandler to Catalog + CatalogMaterializedTable updatedMaterializedTable = + catalogMaterializedTable.copy( + CatalogMaterializedTable.RefreshStatus.ACTIVATED, + continuousRefreshHandler.asSummaryString(), + serializedBytes); + List<TableChange> tableChanges = new ArrayList<>(); + tableChanges.add( + TableChange.modifyRefreshStatus( + CatalogMaterializedTable.RefreshStatus.ACTIVATED)); + tableChanges.add( + TableChange.modifyRefreshHandler( + continuousRefreshHandler.asSummaryString(), serializedBytes)); + + AlterMaterializedTableChangeOperation alterMaterializedTableChangeOperation = + new AlterMaterializedTableChangeOperation( + materializedTableIdentifier, tableChanges, updatedMaterializedTable); + operationExecutor.callExecutableOperation( + handle, alterMaterializedTableChangeOperation); + } catch (Exception e) { + // drop materialized table while submit flink streaming job occur exception. Thus, weak + // atomicity is guaranteed + operationExecutor.callExecutableOperation( + handle, + new DropMaterializedTableOperation(materializedTableIdentifier, true, false)); + // log and throw exception + LOG.error( + "Submit continuous refresh job for materialized table {} occur exception.", + materializedTableIdentifier, + e); + throw new TableException( + String.format( + "Submit continuous refresh job for materialized table %s occur exception.", + materializedTableIdentifier), + e); + } + } + + private static List<RowData> fetchAllResults(ResultFetcher resultFetcher) { + Long token = 0L; + List<RowData> results = new ArrayList<>(); + while (token != null) { + ResultSet result = resultFetcher.fetchResults(token, Integer.MAX_VALUE); + results.addAll(result.getData()); + token = result.getNextToken(); + } + return results; + } +} diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java index c50ba8c2bbf..ddd0f930155 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java @@ -65,6 +65,7 @@ import org.apache.flink.table.gateway.api.results.FunctionInfo; import org.apache.flink.table.gateway.api.results.TableInfo; import org.apache.flink.table.gateway.environment.SqlGatewayStreamExecutionEnvironment; import org.apache.flink.table.gateway.service.context.SessionContext; +import org.apache.flink.table.gateway.service.materializedtable.MaterializedTableManager; import org.apache.flink.table.gateway.service.result.ResultFetcher; import org.apache.flink.table.gateway.service.utils.SqlExecutionException; import org.apache.flink.table.module.ModuleManager; @@ -96,6 +97,7 @@ import org.apache.flink.table.operations.ddl.CreateCatalogFunctionOperation; import org.apache.flink.table.operations.ddl.CreateOperation; import org.apache.flink.table.operations.ddl.CreateTempSystemFunctionOperation; import org.apache.flink.table.operations.ddl.DropOperation; +import org.apache.flink.table.operations.materializedtable.MaterializedTableOperation; import org.apache.flink.table.resource.ResourceManager; import org.apache.flink.table.utils.DateTimeUtils; import org.apache.flink.util.CollectionUtil; @@ -193,9 +195,14 @@ public class OperationExecutor { } public ResultFetcher executeStatement(OperationHandle handle, String statement) { + return executeStatement(handle, new Configuration(), statement); + } + + public ResultFetcher executeStatement( + OperationHandle handle, Configuration customConfig, String statement) { // Instantiate the TableEnvironment lazily ResourceManager resourceManager = sessionContext.getSessionState().resourceManager.copy(); - TableEnvironmentInternal tableEnv = getTableEnvironment(resourceManager); + TableEnvironmentInternal tableEnv = getTableEnvironment(resourceManager, customConfig); PlanCacheManager planCacheManager = sessionContext.getPlanCacheManager(); CachedPlan cachedPlan = null; Operation op = null; @@ -344,13 +351,16 @@ public class OperationExecutor { // -------------------------------------------------------------------------------------------- public TableEnvironmentInternal getTableEnvironment() { - return getTableEnvironment(sessionContext.getSessionState().resourceManager); + return getTableEnvironment( + sessionContext.getSessionState().resourceManager, new Configuration()); } - public TableEnvironmentInternal getTableEnvironment(ResourceManager resourceManager) { + public TableEnvironmentInternal getTableEnvironment( + ResourceManager resourceManager, Configuration customConfig) { // checks the value of RUNTIME_MODE Configuration operationConfig = sessionContext.getSessionConf().clone(); operationConfig.addAll(executionConfig); + operationConfig.addAll(customConfig); final EnvironmentSettings settings = EnvironmentSettings.newInstance().withConfiguration(operationConfig).build(); @@ -492,12 +502,15 @@ public class OperationExecutor { || op instanceof CreateCatalogFunctionOperation || op instanceof ShowFunctionsOperation) { return callExecutableOperation(handle, (ExecutableOperation) op); + } else if (op instanceof MaterializedTableOperation) { + return MaterializedTableManager.callMaterializedTableOperation( + this, handle, (MaterializedTableOperation) op, statement); } else { return callOperation(tableEnv, handle, op); } } - private ResultFetcher callExecutableOperation(OperationHandle handle, ExecutableOperation op) { + public ResultFetcher callExecutableOperation(OperationHandle handle, ExecutableOperation op) { TableResultInternal result = op.execute( new ExecutableOperationContextImpl( @@ -521,6 +534,10 @@ public class OperationExecutor { return tableConfig; } + public SessionContext getSessionContext() { + return sessionContext; + } + private ResultFetcher callSetOperation( TableEnvironmentInternal tableEnv, OperationHandle handle, SetOperation setOp) { if (setOp.getKey().isPresent() && setOp.getValue().isPresent()) { diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java new file mode 100644 index 00000000000..29ab697f384 --- /dev/null +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.service; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.catalog.CatalogMaterializedTable; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.gateway.api.operation.OperationHandle; +import org.apache.flink.table.gateway.api.session.SessionEnvironment; +import org.apache.flink.table.gateway.api.session.SessionHandle; +import org.apache.flink.table.gateway.api.utils.MockedEndpointVersion; +import org.apache.flink.table.gateway.service.utils.IgnoreExceptionHandler; +import org.apache.flink.table.gateway.service.utils.SqlExecutionException; +import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.testutils.executor.TestExecutorExtension; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; + +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.flink.table.catalog.CommonCatalogOptions.TABLE_CATALOG_STORE_KIND; +import static org.apache.flink.table.gateway.service.utils.SqlGatewayServiceTestUtil.awaitOperationTermination; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** + * ITCase for materialized table related statement via {@link SqlGatewayServiceImpl}. Use a separate + * test class rather than adding test cases to {@link SqlGatewayServiceITCase}, both because the + * syntax related to Materialized table is relatively independent, and to try to avoid conflicts + * with the code in {@link SqlGatewayServiceITCase}. + */ +public class MaterializedTableStatementITCase { + + private static final String FILE_CATALOG_STORE = "file_store"; + private static final String TEST_CATALOG_PREFIX = "test_catalog"; + private static final String TEST_DEFAULT_DATABASE = "test_db"; + + private static final AtomicLong COUNTER = new AtomicLong(0); + + @RegisterExtension + @Order(1) + static final MiniClusterExtension MINI_CLUSTER = + new MiniClusterExtension( + new MiniClusterResourceConfiguration.Builder() + .setNumberTaskManagers(2) + .build()); + + @RegisterExtension + @Order(2) + static final SqlGatewayServiceExtension SQL_GATEWAY_SERVICE_EXTENSION = + new SqlGatewayServiceExtension(MINI_CLUSTER::getClientConfiguration); + + @RegisterExtension + @Order(3) + static final TestExecutorExtension<ExecutorService> EXECUTOR_EXTENSION = + new TestExecutorExtension<>( + () -> + Executors.newCachedThreadPool( + new ExecutorThreadFactory( + "SqlGatewayService Test Pool", + IgnoreExceptionHandler.INSTANCE))); + + private static SqlGatewayServiceImpl service; + private static SessionEnvironment defaultSessionEnvironment; + private static Path baseCatalogPath; + + private String fileSystemCatalogPath; + private String fileSystemCatalogName; + + @BeforeAll + static void setUp(@TempDir Path temporaryFolder) throws Exception { + service = (SqlGatewayServiceImpl) SQL_GATEWAY_SERVICE_EXTENSION.getService(); + + // initialize file catalog store path + Path fileCatalogStore = temporaryFolder.resolve(FILE_CATALOG_STORE); + Files.createDirectory(fileCatalogStore); + Map<String, String> catalogStoreOptions = new HashMap<>(); + catalogStoreOptions.put(TABLE_CATALOG_STORE_KIND.key(), "file"); + catalogStoreOptions.put("table.catalog-store.file.path", fileCatalogStore.toString()); + + // initialize test-filesystem catalog base path + baseCatalogPath = temporaryFolder.resolve(TEST_CATALOG_PREFIX); + Files.createDirectory(baseCatalogPath); + + defaultSessionEnvironment = + SessionEnvironment.newBuilder() + .addSessionConfig(catalogStoreOptions) + .setSessionEndpointVersion(MockedEndpointVersion.V1) + .build(); + } + + @BeforeEach + void before() throws Exception { + String randomStr = String.valueOf(COUNTER.incrementAndGet()); + // initialize test-filesystem catalog path with random uuid + Path fileCatalogPath = baseCatalogPath.resolve(randomStr); + Files.createDirectory(fileCatalogPath); + Path dbPath = fileCatalogPath.resolve(TEST_DEFAULT_DATABASE); + Files.createDirectory(dbPath); + + fileSystemCatalogPath = fileCatalogPath.toString(); + fileSystemCatalogName = TEST_CATALOG_PREFIX + randomStr; + } + + @Test + void testCreateMaterializedTableInContinuousMode() throws Exception { + // initialize session handle, create test-filesystem catalog and register it to catalog + // store + SessionHandle sessionHandle = initializeSession(); + + String materializedTableDDL = + "CREATE MATERIALIZED TABLE users_shops" + + " PARTITIONED BY (ds)\n" + + " WITH(\n" + + " 'format' = 'debezium-json'\n" + + " )\n" + + " FRESHNESS = INTERVAL '30' SECOND\n" + + " AS SELECT \n" + + " user_id,\n" + + " shop_id,\n" + + " ds,\n" + + " SUM (payment_amount_cents) AS payed_buy_fee_sum,\n" + + " SUM (1) AS pv\n" + + " FROM (\n" + + " SELECT user_id, shop_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM datagenSource" + + " ) AS tmp\n" + + " GROUP BY (user_id, shop_id, ds)"; + OperationHandle materializedTableHandle = + service.executeStatement( + sessionHandle, materializedTableDDL, -1, new Configuration()); + awaitOperationTermination(service, sessionHandle, materializedTableHandle); + + // validate materialized table: schema, refresh mode, refresh status, refresh handler, + // doesn't check the data because it generates randomly. + ResolvedCatalogMaterializedTable actualMaterializedTable = + (ResolvedCatalogMaterializedTable) + service.getTable( + sessionHandle, + ObjectIdentifier.of( + fileSystemCatalogName, + TEST_DEFAULT_DATABASE, + "users_shops")); + + // Expected schema + ResolvedSchema expectedSchema = + ResolvedSchema.of( + Arrays.asList( + Column.physical("user_id", DataTypes.BIGINT()), + Column.physical("shop_id", DataTypes.BIGINT()), + Column.physical("ds", DataTypes.STRING()), + Column.physical("payed_buy_fee_sum", DataTypes.BIGINT()), + Column.physical("pv", DataTypes.INT().notNull()))); + + assertThat(actualMaterializedTable.getResolvedSchema()).isEqualTo(expectedSchema); + assertThat(actualMaterializedTable.getFreshness()).isEqualTo(Duration.ofSeconds(30)); + assertThat(actualMaterializedTable.getLogicalRefreshMode()) + .isEqualTo(CatalogMaterializedTable.LogicalRefreshMode.AUTOMATIC); + assertThat(actualMaterializedTable.getRefreshMode()) + .isEqualTo(CatalogMaterializedTable.RefreshMode.CONTINUOUS); + assertThat(actualMaterializedTable.getRefreshStatus()) + .isEqualTo(CatalogMaterializedTable.RefreshStatus.ACTIVATED); + assertThat(actualMaterializedTable.getRefreshHandlerDescription()).isNotEmpty(); + assertThat(actualMaterializedTable.getSerializedRefreshHandler()).isNotEmpty(); + } + + @Test + void testCreateMaterializedTableInFullMode() { + // initialize session handle, create test-filesystem catalog and register it to catalog + // store + SessionHandle sessionHandle = initializeSession(); + + String materializedTableDDL = + "CREATE MATERIALIZED TABLE users_shops" + + " PARTITIONED BY (ds)\n" + + " WITH(\n" + + " 'format' = 'debezium-json'\n" + + " )\n" + + " FRESHNESS = INTERVAL '1' DAY\n" + + " AS SELECT \n" + + " user_id,\n" + + " shop_id,\n" + + " ds,\n" + + " SUM (payment_amount_cents) AS payed_buy_fee_sum,\n" + + " SUM (1) AS pv\n" + + " FROM (\n" + + " SELECT user_id, shop_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM datagenSource" + + " ) AS tmp\n" + + " GROUP BY (user_id, shop_id, ds)"; + OperationHandle materializedTableHandle = + service.executeStatement( + sessionHandle, materializedTableDDL, -1, new Configuration()); + + assertThatThrownBy( + () -> + awaitOperationTermination( + service, sessionHandle, materializedTableHandle)) + .rootCause() + .isInstanceOf(SqlExecutionException.class) + .hasMessage( + "Only support create materialized table in continuous refresh mode currently."); + } + + private SessionHandle initializeSession() { + SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment); + String catalogDDL = + String.format( + "CREATE CATALOG %s\n" + + "WITH (\n" + + " 'type' = 'test-filesystem',\n" + + " 'path' = '%s',\n" + + " 'default-database' = '%s'\n" + + " )", + fileSystemCatalogName, fileSystemCatalogPath, TEST_DEFAULT_DATABASE); + service.configureSession(sessionHandle, catalogDDL, -1); + service.configureSession( + sessionHandle, String.format("USE CATALOG %s", fileSystemCatalogName), -1); + + // create source table + String dataGenSource = + "CREATE TABLE datagenSource (\n" + + " order_id BIGINT,\n" + + " order_number VARCHAR(20),\n" + + " user_id BIGINT,\n" + + " shop_id BIGINT,\n" + + " product_id BIGINT,\n" + + " status BIGINT,\n" + + " order_type BIGINT,\n" + + " order_created_at TIMESTAMP,\n" + + " payment_amount_cents BIGINT\n" + + ")\n" + + "WITH (\n" + + " 'connector' = 'datagen',\n" + + " 'rows-per-second' = '10'\n" + + ")"; + service.configureSession(sessionHandle, dataGenSource, -1); + return sessionHandle; + } +}