This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new c61342f9 [Improve](case) add customer doris container cluster (#491)
c61342f9 is described below
commit c61342f9651829c72f7828480c831dba52ad8ef3
Author: wudi <[email protected]>
AuthorDate: Fri Sep 20 10:16:09 2024 +0800
[Improve](case) add customer doris container cluster (#491)
---
.../flink/container/AbstractContainerTestBase.java | 8 +-
.../flink/container/instance/ContainerService.java | 2 +
.../flink/container/instance/DorisContainer.java | 5 +
.../container/instance/DorisCustomerContainer.java | 132 +++++++++++++++++++++
.../flink/container/instance/MySQLContainer.java | 5 +
.../apache/doris/flink/sink/DorisSinkITCase.java | 51 +++++++-
6 files changed, 196 insertions(+), 7 deletions(-)
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractContainerTestBase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractContainerTestBase.java
index 61e0faac..5c7c151e 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractContainerTestBase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractContainerTestBase.java
@@ -19,6 +19,7 @@ package org.apache.doris.flink.container;
import org.apache.doris.flink.container.instance.ContainerService;
import org.apache.doris.flink.container.instance.DorisContainer;
+import org.apache.doris.flink.container.instance.DorisCustomerContainer;
import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,7 +49,8 @@ public abstract class AbstractContainerTestBase {
LOG.info("The doris container has been started and is running
status.");
return;
}
- dorisContainerService = new DorisContainer();
+ Boolean customerEnv =
Boolean.valueOf(System.getProperty("customer_env", "false"));
+ dorisContainerService = customerEnv ? new DorisCustomerContainer() :
new DorisContainer();
dorisContainerService.startContainer();
LOG.info("Doris container was started.");
}
@@ -74,9 +76,7 @@ public abstract class AbstractContainerTestBase {
}
protected String getDorisQueryUrl() {
- return String.format(
- "jdbc:mysql://%s:%s",
- getDorisInstanceHost(),
dorisContainerService.getMappedPort(9030));
+ return dorisContainerService.getJdbcUrl();
}
protected String getDorisInstanceHost() {
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/ContainerService.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/ContainerService.java
index 6ad1e3cd..684de5a0 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/ContainerService.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/ContainerService.java
@@ -29,6 +29,8 @@ public interface ContainerService {
Connection getQueryConnection();
+ String getJdbcUrl();
+
String getInstanceHost();
Integer getMappedPort(int originalPort);
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisContainer.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisContainer.java
index 6af827b8..ef399d0d 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisContainer.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisContainer.java
@@ -115,6 +115,11 @@ public class DorisContainer implements ContainerService {
}
}
+ @Override
+ public String getJdbcUrl() {
+ return String.format(JDBC_URL, dorisContainer.getHost());
+ }
+
@Override
public String getInstanceHost() {
return dorisContainer.getHost();
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisCustomerContainer.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisCustomerContainer.java
new file mode 100644
index 00000000..3d417303
--- /dev/null
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisCustomerContainer.java
@@ -0,0 +1,132 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.container.instance;
+
+import org.apache.flink.util.Preconditions;
+
+import org.apache.doris.flink.exception.DorisRuntimeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+/** Using a custom Doris environment */
+public class DorisCustomerContainer implements ContainerService {
+ private static final Logger LOG =
LoggerFactory.getLogger(DorisCustomerContainer.class);
+ private static final String JDBC_URL = "jdbc:mysql://%s:%s";
+
+ @Override
+ public void startContainer() {
+ LOG.info("Using doris customer containers env.");
+ checkParams();
+ if (!isRunning()) {
+ throw new DorisRuntimeException(
+ "Backend is not alive. Please check the doris cluster.");
+ }
+ }
+
+ private void checkParams() {
+ Preconditions.checkArgument(
+ System.getProperty("doris_host") != null, "doris_host is
required.");
+ Preconditions.checkArgument(
+ System.getProperty("doris_query_port") != null,
"doris_query_port is required.");
+ Preconditions.checkArgument(
+ System.getProperty("doris_http_port") != null,
"doris_http_port is required.");
+ Preconditions.checkArgument(
+ System.getProperty("doris_user") != null, "doris_user is
required.");
+ Preconditions.checkArgument(
+ System.getProperty("doris_passwd") != null, "doris_passwd is
required.");
+ }
+
+ @Override
+ public boolean isRunning() {
+ try (Connection conn = getQueryConnection();
+ Statement stmt = conn.createStatement()) {
+ ResultSet showBackends = stmt.executeQuery("show backends");
+ while (showBackends.next()) {
+ String isAlive = showBackends.getString("Alive").trim();
+ if (Boolean.toString(true).equalsIgnoreCase(isAlive)) {
+ return true;
+ }
+ }
+ } catch (SQLException e) {
+ LOG.error("Failed to connect doris cluster.", e);
+ return false;
+ }
+ return false;
+ }
+
+ @Override
+ public Connection getQueryConnection() {
+ LOG.info("Try to get query connection from doris.");
+ String jdbcUrl =
+ String.format(
+ JDBC_URL,
+ System.getProperty("doris_host"),
+ System.getProperty("doris_query_port"));
+ try {
+ return DriverManager.getConnection(jdbcUrl, getUsername(),
getPassword());
+ } catch (SQLException e) {
+ LOG.info("Failed to get doris query connection. jdbcUrl={}",
jdbcUrl, e);
+ throw new DorisRuntimeException(e);
+ }
+ }
+
+ @Override
+ public String getJdbcUrl() {
+ return String.format(
+ JDBC_URL, System.getProperty("doris_host"),
System.getProperty("doris_query_port"));
+ }
+
+ @Override
+ public String getInstanceHost() {
+ return System.getProperty("doris_host");
+ }
+
+ @Override
+ public Integer getMappedPort(int originalPort) {
+ return originalPort;
+ }
+
+ @Override
+ public String getUsername() {
+ return System.getProperty("doris_user");
+ }
+
+ @Override
+ public String getPassword() {
+ return System.getProperty("doris_passwd");
+ }
+
+ @Override
+ public String getFenodes() {
+ return System.getProperty("doris_host") + ":" +
System.getProperty("doris_http_port");
+ }
+
+ @Override
+ public String getBenodes() {
+ return null;
+ }
+
+ @Override
+ public void close() {}
+}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/MySQLContainer.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/MySQLContainer.java
index 21b30e81..4e50ac64 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/MySQLContainer.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/MySQLContainer.java
@@ -92,6 +92,11 @@ public class MySQLContainer implements ContainerService {
}
}
+ @Override
+ public String getJdbcUrl() {
+ return mysqlcontainer.getJdbcUrl();
+ }
+
@Override
public void close() {
LOG.info("Stopping MySQL container.");
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
index 877074ed..80986ea3 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
@@ -27,6 +27,7 @@ import
org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.StringUtils;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
@@ -63,6 +64,7 @@ public class DorisSinkITCase extends AbstractITCaseService {
static final String TABLE_CSV = "tbl_csv";
static final String TABLE_JSON = "tbl_json";
static final String TABLE_JSON_TBL = "tbl_json_tbl";
+ static final String TABLE_TBL_AUTO_REDIRECT = "tbl_tbl_auto_redirect";
static final String TABLE_CSV_BATCH_TBL = "tbl_csv_batch_tbl";
static final String TABLE_CSV_BATCH_DS = "tbl_csv_batch_DS";
static final String TABLE_GROUP_COMMIT = "tbl_group_commit";
@@ -177,8 +179,6 @@ public class DorisSinkITCase extends AbstractITCaseService {
+ DorisConfigOptions.IDENTIFIER
+ "',"
+ " 'fenodes' = '%s',"
- + " 'benodes' = '%s',"
- + " 'auto-redirect' = 'false',"
+ " 'table.identifier' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
@@ -196,7 +196,6 @@ public class DorisSinkITCase extends AbstractITCaseService {
+ "'"
+ ")",
getFenodes(),
- getBenodes(),
DATABASE + "." + TABLE_JSON_TBL,
getDorisUsername(),
getDorisPassword());
@@ -210,6 +209,52 @@ public class DorisSinkITCase extends AbstractITCaseService
{
ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected,
query, 2);
}
+ @Test
+ public void testTableSinkAutoRedirectFalse() throws Exception {
+ if (StringUtils.isNullOrWhitespaceOnly(getBenodes())) {
+ LOG.info("benodes is empty, skip the test.");
+ return;
+ }
+ initializeTable(TABLE_TBL_AUTO_REDIRECT);
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(DEFAULT_PARALLELISM);
+ env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+ final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+ String sinkDDL =
+ String.format(
+ "CREATE TABLE doris_sink ("
+ + " name STRING,"
+ + " age INT"
+ + ") WITH ("
+ + " 'connector' = '"
+ + DorisConfigOptions.IDENTIFIER
+ + "',"
+ + " 'fenodes' = '%s',"
+ + " 'benodes' = '%s',"
+ + " 'auto-redirect' = 'false',"
+ + " 'table.identifier' = '%s',"
+ + " 'username' = '%s',"
+ + " 'password' = '%s',"
+ + " 'sink.label-prefix' = 'doris_sink"
+ + UUID.randomUUID()
+ + "'"
+ + ")",
+ getFenodes(),
+ getBenodes(),
+ DATABASE + "." + TABLE_TBL_AUTO_REDIRECT,
+ getDorisUsername(),
+ getDorisPassword());
+ tEnv.executeSql(sinkDDL);
+ tEnv.executeSql("INSERT INTO doris_sink SELECT 'doris',1 union all
SELECT 'flink',2");
+
+ Thread.sleep(10000);
+ List<String> expected = Arrays.asList("doris,1", "flink,2");
+ String query =
+ String.format("select name,age from %s.%s order by 1",
DATABASE, TABLE_JSON_TBL);
+ ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected,
query, 2);
+ }
+
@Test
public void testTableBatch() throws Exception {
initializeTable(TABLE_CSV_BATCH_TBL);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]