This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push:
new cdbeabd [Improve](case) add doris source it case (#296)
cdbeabd is described below
commit cdbeabd1f12d0b0577d32fe64c57b735669f4a94
Author: wudi <[email protected]>
AuthorDate: Wed Mar 26 16:30:24 2025 +0800
[Improve](case) add doris source it case (#296)
---
.github/workflows/run-itcase-12.yml | 51 ---
.../{run-itcase-20.yml => run-itcase.yml} | 10 +-
.licenserc.yaml | 28 +-
.../spark/client/read/AbstractThriftReader.java | 3 +-
.../spark/client/read/DorisFlightSqlReader.java | 27 +-
.../client/read/ReaderPartitionGenerator.java | 8 +-
.../apache/doris/spark/client/read/RowBatch.java | 16 +-
.../spark/exception/DorisRuntimeException.java | 45 +++
.../apache/doris/spark/rest/models/DataModel.java | 25 ++
.../apache/doris/spark/util/SchemaConvertors.scala | 2 +-
.../doris/spark/client/read/RowBatchTest.java | 15 +-
.../java/org/apache/doris/spark/DorisTestBase.java | 137 --------
.../spark/container/AbstractContainerTestBase.java | 118 +++++++
.../doris/spark/container/ContainerUtils.java | 188 ++++++++++
.../spark/container/instance/ContainerService.java | 52 +++
.../spark/container/instance/DorisContainer.java | 276 +++++++++++++++
.../container/instance/DorisCustomerContainer.java | 138 ++++++++
.../apache/doris/spark/sql/DorisReaderITCase.scala | 386 +++++++++++++++++----
.../apache/doris/spark/sql/DorisWriterITCase.scala | 100 +++---
.../test/resources/container/ddl/read_all_type.sql | 50 +++
.../test/resources/container/ddl/read_bitmap.sql | 17 +
.../src/test/resources/docker/doris/be.conf | 99 ++++++
.../src/test/resources/docker/doris/fe.conf | 74 ++++
.../src/test/resources/log4j.properties | 23 ++
.../read/expression/V2ExpressionBuilder.scala | 15 +-
25 files changed, 1540 insertions(+), 363 deletions(-)
diff --git a/.github/workflows/run-itcase-12.yml
b/.github/workflows/run-itcase-12.yml
deleted file mode 100644
index fd28357..0000000
--- a/.github/workflows/run-itcase-12.yml
+++ /dev/null
@@ -1,51 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
----
-name: Run ITCases 1.2
-on:
- pull_request:
- push:
-
-jobs:
- build-extension:
- name: "Run ITCases 1.2"
- runs-on: ubuntu-latest
- defaults:
- run:
- shell: bash
- steps:
- - name: Checkout
- uses: actions/checkout@master
-
- - name: Setup java
- uses: actions/setup-java@v2
- with:
- distribution: adopt
- java-version: '8'
-
- - name: Run ITCases for spark 2
- run: |
- cd spark-doris-connector && mvn clean test -Pspark-2-it,spark-2.4_2.11
-pl spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*ITCase"
-Dimage="adamlee489/doris:1.2.7.1_x86"
-
- - name: Run ITCases for spark 3.1
- run: |
- cd spark-doris-connector && mvn clean test -Pspark-3-it,spark-3.1 -pl
spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*ITCase"
-Dimage="adamlee489/doris:1.2.7.1_x86"
-
- - name: Run ITCases for spark 3.3
- run: |
- cd spark-doris-connector && mvn clean test -Pspark-3-it,spark-3.3 -pl
spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*ITCase"
-Dimage="adamlee489/doris:1.2.7.1_x86"
diff --git a/.github/workflows/run-itcase-20.yml
b/.github/workflows/run-itcase.yml
similarity index 87%
rename from .github/workflows/run-itcase-20.yml
rename to .github/workflows/run-itcase.yml
index b0f31c0..10be11e 100644
--- a/.github/workflows/run-itcase-20.yml
+++ b/.github/workflows/run-itcase.yml
@@ -16,14 +16,14 @@
# under the License.
#
---
-name: Run ITCases 2.0
+name: Run ITCases
on:
pull_request:
push:
jobs:
build-extension:
- name: "Run ITCases 2.0"
+ name: "Run ITCases"
runs-on: ubuntu-latest
defaults:
run:
@@ -40,13 +40,13 @@ jobs:
- name: Run ITCases for spark 2
run: |
- cd spark-doris-connector && mvn clean test -Pspark-2-it,spark-2.4_2.11
-pl spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*ITCase"
-Dimage="adamlee489/doris:2.0.3"
+ cd spark-doris-connector && mvn clean test -Pspark-2-it,spark-2.4_2.11
-pl spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*ITCase"
-Dimage="apache/doris:doris-all-in-one-2.1.0"
- name: Run ITCases for spark 3.1
run: |
- cd spark-doris-connector && mvn clean test -Pspark-3-it,spark-3.1 -pl
spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*ITCase"
-Dimage="adamlee489/doris:2.0.3"
+ cd spark-doris-connector && mvn clean test -Pspark-3-it,spark-3.1 -pl
spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*ITCase"
-Dimage="apache/doris:doris-all-in-one-2.1.0"
- name: Run ITCases for spark 3.3
run: |
- cd spark-doris-connector && mvn clean test -Pspark-3-it,spark-3.3 -pl
spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*ITCase"
-Dimage="adamlee489/doris:2.0.3"
+ cd spark-doris-connector && mvn clean test -Pspark-3-it,spark-3.3 -pl
spark-doris-connector-it -am -DfailIfNoTests=false -Dtest="*ITCase"
-Dimage="apache/doris:doris-all-in-one-2.1.0"
\ No newline at end of file
diff --git a/.licenserc.yaml b/.licenserc.yaml
index e5af614..4feebdc 100644
--- a/.licenserc.yaml
+++ b/.licenserc.yaml
@@ -5,33 +5,15 @@ header:
paths-ignore:
- 'dist'
- - 'licenses'
- - '**/*.md'
- - 'LICENSE'
+ - 'LICENSE.txt'
+ - 'NOTICE.txt'
- 'NOTICE'
- - 'DISCLAIMER'
- - '.clang-format'
- - '.clang-format-ignore'
- - '.gitattributes'
- '.gitignore'
- - '.gitmodules'
+ - '.github/PULL_REQUEST_TEMPLATE.md'
- '.licenserc.yaml'
- - '.rat-excludes'
- - 'be/src/common/status.cpp'
- - 'be/src/common/status.h'
- - 'be/src/env/env.h'
- - 'be/src/env/env_posix.cpp'
- - '**/glibc-compatibility/**'
- - '**/gutil/**'
- - '**/test_data/**'
- - '**/jmockit/**'
- - '**/*.json'
- - '**/*.dat'
- - '**/*.svg'
- - '**/*.md5'
- - '**/*.patch'
- - '**/*.log'
- 'custom_env.sh.tpl'
- '**/*.csv'
+ - '**/jmockit/**'
+ -
'spark-doris-connector/spark-doris-connector-it/src/test/resources/container/'
comment: on-failure
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/AbstractThriftReader.java
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/AbstractThriftReader.java
index 7dbb72b..373910c 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/AbstractThriftReader.java
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/AbstractThriftReader.java
@@ -32,6 +32,7 @@ import org.apache.doris.spark.exception.DorisException;
import org.apache.doris.spark.exception.OptionRequiredException;
import org.apache.doris.spark.rest.models.Field;
import org.apache.doris.spark.rest.models.Schema;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -259,7 +260,7 @@ public abstract class AbstractThriftReader extends
DorisReader {
if (readColumn.contains(" AS ")) {
int asIdx = readColumn.indexOf(" AS ");
String realColumn = readColumn.substring(asIdx +
4).trim().replaceAll("`", "");
- if (fieldTypeMap.containsKey(realColumn) &&
scanTypeMap.containsKey(realColumn)
+ if (fieldTypeMap.containsKey(realColumn)
&&
("BITMAP".equalsIgnoreCase(fieldTypeMap.get(realColumn).getType())
||
"HLL".equalsIgnoreCase(fieldTypeMap.get(realColumn).getType()))) {
newFieldList.add(new Field(realColumn,
TPrimitiveType.VARCHAR.name(), null, 0, 0, null));
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java
index 4623d65..779d622 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java
@@ -17,17 +17,6 @@
package org.apache.doris.spark.client.read;
-import org.apache.arrow.adbc.core.AdbcConnection;
-import org.apache.arrow.adbc.core.AdbcDatabase;
-import org.apache.arrow.adbc.core.AdbcDriver;
-import org.apache.arrow.adbc.core.AdbcException;
-import org.apache.arrow.adbc.core.AdbcStatement;
-import org.apache.arrow.adbc.driver.flightsql.FlightSqlDriver;
-import org.apache.arrow.flight.Location;
-import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.memory.RootAllocator;
-import org.apache.arrow.vector.ipc.ArrowReader;
-import org.apache.commons.lang3.StringUtils;
import org.apache.doris.sdk.thrift.TPrimitiveType;
import org.apache.doris.spark.client.DorisFrontendClient;
import org.apache.doris.spark.client.entity.DorisReaderPartition;
@@ -39,6 +28,18 @@ import
org.apache.doris.spark.exception.OptionRequiredException;
import org.apache.doris.spark.exception.ShouldNeverHappenException;
import org.apache.doris.spark.rest.models.Field;
import org.apache.doris.spark.rest.models.Schema;
+
+import org.apache.arrow.adbc.core.AdbcConnection;
+import org.apache.arrow.adbc.core.AdbcDatabase;
+import org.apache.arrow.adbc.core.AdbcDriver;
+import org.apache.arrow.adbc.core.AdbcException;
+import org.apache.arrow.adbc.core.AdbcStatement;
+import org.apache.arrow.adbc.driver.flightsql.FlightSqlDriver;
+import org.apache.arrow.flight.Location;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,6 +55,7 @@ import java.util.stream.Collectors;
public class DorisFlightSqlReader extends DorisReader {
private static final Logger log =
LoggerFactory.getLogger(DorisFlightSqlReader.class);
+ private static final String PREFIX = "/* ApplicationName=Spark
ArrowFlightSQL Query */";
private final AtomicBoolean endOfStream = new AtomicBoolean(false);
private final DorisFrontendClient frontendClient;
private final Schema schema;
@@ -136,6 +138,7 @@ public class DorisFlightSqlReader extends DorisReader {
private ArrowReader executeQuery() throws AdbcException,
OptionRequiredException {
AdbcStatement statement = connection.createStatement();
String flightSql = generateQuerySql(partition);
+ log.info("Query SQL Sending to Doris FE is: {}", flightSql);
statement.setSqlQuery(flightSql);
AdbcStatement.QueryResult queryResult = statement.executeQuery();
return queryResult.getReader();
@@ -147,7 +150,7 @@ public class DorisFlightSqlReader extends DorisReader {
String tablets = String.format("TABLET(%s)",
StringUtils.join(partition.getTablets(), ","));
String predicates = partition.getFilters().length == 0 ? "" : " WHERE
" + String.join(" AND ", partition.getFilters());
String limit = partition.getLimit() > 0 ? " LIMIT " +
partition.getLimit() : "";
- return String.format("SELECT %s FROM %s %s%s%s", columns,
fullTableName, tablets, predicates, limit);
+ return PREFIX + String.format("SELECT %s FROM %s %s%s%s", columns,
fullTableName, tablets, predicates, limit);
}
protected Schema processDorisSchema(DorisReaderPartition partition) throws
Exception {
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/ReaderPartitionGenerator.java
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/ReaderPartitionGenerator.java
index 002b58b..bc6c410 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/ReaderPartitionGenerator.java
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/ReaderPartitionGenerator.java
@@ -17,7 +17,6 @@
package org.apache.doris.spark.client.read;
-import com.google.common.annotations.VisibleForTesting;
import org.apache.doris.spark.client.DorisFrontendClient;
import org.apache.doris.spark.client.entity.Backend;
import org.apache.doris.spark.client.entity.DorisReaderPartition;
@@ -27,6 +26,8 @@ import org.apache.doris.spark.rest.models.Field;
import org.apache.doris.spark.rest.models.QueryPlan;
import org.apache.doris.spark.rest.models.Schema;
import org.apache.doris.spark.util.DorisDialects;
+
+import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,7 +55,7 @@ public class ReaderPartitionGenerator {
originReadCols = new String[0];
}
String[] filters = config.contains(DorisOptions.DORIS_FILTER_QUERY) ?
- config.getValue(DorisOptions.DORIS_FILTER_QUERY).split("\\.")
: new String[0];
+ new String[]{config.getValue(DorisOptions.DORIS_FILTER_QUERY)}
: new String[0];
return generatePartitions(config, originReadCols, filters, -1,
datetimeJava8ApiEnabled);
}
@@ -131,7 +132,8 @@ public class ReaderPartitionGenerator {
protected static String[] getFinalReadColumns(DorisConfig config,
DorisFrontendClient frontendClient, String db, String table, String[]
readFields) throws Exception {
Schema tableSchema = frontendClient.getTableSchema(db, table);
- Map<String, String> fieldTypeMap =
tableSchema.getProperties().stream().collect(Collectors.toMap(Field::getName,
Field::getType));
+ Map<String, String> fieldTypeMap =
tableSchema.getProperties().stream().collect(
+ Collectors.toMap(Field::getName, Field::getType));
Boolean bitmapToString =
config.getValue(DorisOptions.DORIS_READ_BITMAP_TO_STRING);
Boolean bitmapToBase64 =
config.getValue(DorisOptions.DORIS_READ_BITMAP_TO_BASE64);
return
Arrays.stream(readFields).filter(fieldTypeMap::containsKey).map(readField -> {
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/RowBatch.java
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/RowBatch.java
index bf576f1..d937edd 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/RowBatch.java
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/RowBatch.java
@@ -17,6 +17,11 @@
package org.apache.doris.spark.client.read;
+import org.apache.doris.sdk.thrift.TScanBatchResult;
+import org.apache.doris.spark.exception.DorisException;
+import org.apache.doris.spark.rest.models.Schema;
+import org.apache.doris.spark.util.IPUtils;
+
import com.google.common.base.Preconditions;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.BaseIntVector;
@@ -44,10 +49,6 @@ import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.apache.arrow.vector.types.Types.MinorType;
import org.apache.commons.lang3.ArrayUtils;
-import org.apache.doris.sdk.thrift.TScanBatchResult;
-import org.apache.doris.spark.exception.DorisException;
-import org.apache.doris.spark.rest.models.Schema;
-import org.apache.doris.spark.util.IPUtils;
import org.apache.spark.sql.types.Decimal;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -64,7 +65,6 @@ import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
-import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.time.temporal.ChronoField;
@@ -74,7 +74,6 @@ import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
-import java.util.TimeZone;
/**
* row batch data container.
@@ -266,7 +265,7 @@ public class RowBatch implements Serializable {
byte[] bytes = largeIntVector.get(rowIndex);
ArrayUtils.reverse(bytes);
BigInteger largeInt = new BigInteger(bytes);
- addValueToRow(rowIndex,
Decimal.apply(largeInt));
+ addValueToRow(rowIndex, largeInt.toString());
}
} else {
VarCharVector largeIntVector = (VarCharVector)
curFieldVector;
@@ -276,8 +275,7 @@ public class RowBatch implements Serializable {
continue;
}
String stringValue = new
String(largeIntVector.get(rowIndex));
- BigInteger largeInt = new
BigInteger(stringValue);
- addValueToRow(rowIndex,
Decimal.apply(largeInt));
+ addValueToRow(rowIndex, stringValue);
}
}
break;
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/exception/DorisRuntimeException.java
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/exception/DorisRuntimeException.java
new file mode 100644
index 0000000..848ac5f
--- /dev/null
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/exception/DorisRuntimeException.java
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.exception;
+
+/** Doris runtime exception. */
+public class DorisRuntimeException extends RuntimeException {
+ public DorisRuntimeException() {
+ super();
+ }
+
+ public DorisRuntimeException(String message) {
+ super(message);
+ }
+
+ public DorisRuntimeException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public DorisRuntimeException(Throwable cause) {
+ super(cause);
+ }
+
+ protected DorisRuntimeException(
+ String message,
+ Throwable cause,
+ boolean enableSuppression,
+ boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+}
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/rest/models/DataModel.java
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/rest/models/DataModel.java
new file mode 100644
index 0000000..a13181e
--- /dev/null
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/rest/models/DataModel.java
@@ -0,0 +1,25 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.rest.models;
+
+public enum DataModel {
+ DUPLICATE,
+ UNIQUE,
+ UNIQUE_MOR,
+ AGGREGATE
+}
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/SchemaConvertors.scala
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/SchemaConvertors.scala
index e694083..91b8317 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/SchemaConvertors.scala
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/SchemaConvertors.scala
@@ -42,7 +42,7 @@ object SchemaConvertors {
case "BINARY" => DataTypes.BinaryType
case "DECIMAL" => DecimalType(precision, scale)
case "CHAR" => DataTypes.StringType
- case "LARGEINT" => DecimalType(38, 0)
+ case "LARGEINT" => DataTypes.StringType
case "VARCHAR" => DataTypes.StringType
case "JSON" => DataTypes.StringType
case "JSONB" => DataTypes.StringType
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/client/read/RowBatchTest.java
b/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/client/read/RowBatchTest.java
index 5a55a95..09f2e07 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/client/read/RowBatchTest.java
+++
b/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/client/read/RowBatchTest.java
@@ -17,6 +17,12 @@
package org.apache.doris.spark.client.read;
+import org.apache.doris.sdk.thrift.TScanBatchResult;
+import org.apache.doris.sdk.thrift.TStatus;
+import org.apache.doris.sdk.thrift.TStatusCode;
+import org.apache.doris.spark.exception.DorisException;
+import org.apache.doris.spark.rest.models.Schema;
+
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@@ -51,11 +57,6 @@ import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.commons.lang3.ArrayUtils;
-import org.apache.doris.sdk.thrift.TScanBatchResult;
-import org.apache.doris.sdk.thrift.TStatus;
-import org.apache.doris.sdk.thrift.TStatusCode;
-import org.apache.doris.spark.exception.DorisException;
-import org.apache.doris.spark.rest.models.Schema;
import org.apache.spark.sql.types.Decimal;
import static org.hamcrest.core.StringStartsWith.startsWith;
import org.junit.Assert;
@@ -610,8 +611,8 @@ public class RowBatchTest {
Assert.assertTrue(rowBatch.hasNext());
List<Object> actualRow0 = rowBatch.next();
- Assert.assertEquals(Decimal.apply(new
BigInteger("9223372036854775808")), actualRow0.get(0));
- Assert.assertEquals(Decimal.apply(new
BigInteger("9223372036854775809")), actualRow0.get(1));
+ Assert.assertEquals("9223372036854775808", actualRow0.get(0));
+ Assert.assertEquals("9223372036854775809", actualRow0.get(1));
Assert.assertFalse(rowBatch.hasNext());
thrown.expect(NoSuchElementException.class);
diff --git
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/DorisTestBase.java
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/DorisTestBase.java
deleted file mode 100644
index f7c6f0b..0000000
---
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/DorisTestBase.java
+++ /dev/null
@@ -1,137 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.spark;
-
-import static org.awaitility.Awaitility.given;
-import static org.awaitility.Durations.ONE_SECOND;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.Network;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.lifecycle.Startables;
-import org.testcontainers.utility.DockerLoggerFactory;
-
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.time.Duration;
-import java.util.Arrays;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.LockSupport;
-import java.util.stream.Stream;
-
-public abstract class DorisTestBase {
- public static final String PASSWORD = "";
- protected static final Logger LOG =
LoggerFactory.getLogger(DorisTestBase.class);
- protected static final String DORIS_DOCKER_IMAGE =
System.getProperty("image");
- protected static final String DRIVER_CLASS = "com.mysql.cj.jdbc.Driver";
- protected static final String URL = "jdbc:mysql://%s:9030";
- protected static final String USERNAME = "root";
- protected static final GenericContainer DORIS_CONTAINER =
createDorisContainer();
- private static final String DRIVER_JAR =
-
"https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.16/mysql-connector-java-8.0.16.jar";
- protected static Connection connection;
-
- protected static String getFenodes() {
- return DORIS_CONTAINER.getHost() + ":8030";
- }
-
- @BeforeClass
- public static void startContainers() {
- LOG.info("Starting containers...");
- Startables.deepStart(Stream.of(DORIS_CONTAINER)).join();
- given().ignoreExceptions()
- .await()
- .atMost(300, TimeUnit.SECONDS)
- .pollInterval(ONE_SECOND)
- .untilAsserted(DorisTestBase::initializeJdbcConnection);
- LOG.info("Containers are started.");
- }
-
- @AfterClass
- public static void stopContainers() {
- LOG.info("Stopping containers...");
- DORIS_CONTAINER.stop();
- LOG.info("Containers are stopped.");
- }
-
- public static GenericContainer createDorisContainer() {
- GenericContainer container =
- new GenericContainer<>(DORIS_DOCKER_IMAGE)
- .withNetwork(Network.newNetwork())
- .withNetworkAliases("DorisContainer")
- .withEnv("FE_SERVERS", "fe1:127.0.0.1:9010")
- .withEnv("FE_ID", "1")
- .withEnv("CURRENT_BE_IP", "127.0.0.1")
- .withEnv("CURRENT_BE_PORT", "9050")
- .withCommand("ulimit -n 65536")
- .withCreateContainerCmdModifier(
- cmd -> cmd.getHostConfig().withMemorySwap(0L))
- .withPrivilegedMode(true)
- .withLogConsumer(
- new Slf4jLogConsumer(
-
DockerLoggerFactory.getLogger(DORIS_DOCKER_IMAGE)));
-
- container.setPortBindings(
- Arrays.asList(
- String.format("%s:%s", "8030", "8030"),
- String.format("%s:%s", "9030", "9030"),
- String.format("%s:%s", "9060", "9060"),
- String.format("%s:%s", "8040", "8040")));
-
- return container;
- }
-
- protected static void initializeJdbcConnection() throws SQLException,
MalformedURLException {
- URLClassLoader urlClassLoader =
- new URLClassLoader(
- new URL[]{new URL(DRIVER_JAR)},
DorisTestBase.class.getClassLoader());
- LOG.info("Try to connect to Doris...");
- Thread.currentThread().setContextClassLoader(urlClassLoader);
- connection =
- DriverManager.getConnection(
- String.format(URL, DORIS_CONTAINER.getHost()),
USERNAME, PASSWORD);
- try (Statement statement = connection.createStatement()) {
- ResultSet resultSet;
- do {
- LOG.info("Wait for the Backend to start successfully...");
- resultSet = statement.executeQuery("show backends");
- } while (!isBeReady(resultSet, Duration.ofSeconds(1L)));
- }
- LOG.info("Connected to Doris successfully...");
- }
-
- private static boolean isBeReady(ResultSet rs, Duration duration) throws
SQLException {
- LockSupport.parkNanos(duration.toNanos());
- if (rs.next()) {
- String isAlive = rs.getString("Alive").trim();
- String totalCap = rs.getString("TotalCapacity").trim();
- return "true".equalsIgnoreCase(isAlive) &&
!"0.000".equalsIgnoreCase(totalCap);
- }
- return false;
- }
-
-}
diff --git
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/AbstractContainerTestBase.java
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/AbstractContainerTestBase.java
new file mode 100644
index 0000000..97e7e26
--- /dev/null
+++
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/AbstractContainerTestBase.java
@@ -0,0 +1,118 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.container;
+
+import org.apache.doris.spark.container.instance.ContainerService;
+import org.apache.doris.spark.container.instance.DorisContainer;
+import org.apache.doris.spark.container.instance.DorisCustomerContainer;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+public abstract class AbstractContainerTestBase {
+ private static final Logger LOG =
LoggerFactory.getLogger(AbstractContainerTestBase.class);
+ protected static ContainerService dorisContainerService;
+ public static final int DEFAULT_PARALLELISM = 2;
+
+ @BeforeClass
+ public static void initContainers() {
+ LOG.info("Trying to start doris containers.");
+ initDorisContainer();
+ }
+
+ private static void initDorisContainer() {
+ if (Objects.nonNull(dorisContainerService) &&
dorisContainerService.isRunning()) {
+ LOG.info("The doris container has been started and is running
status.");
+ return;
+ }
+ Boolean customerEnv =
Boolean.valueOf(System.getProperty("customer_env", "false"));
+ dorisContainerService = customerEnv ? new DorisCustomerContainer() :
new DorisContainer();
+ dorisContainerService.startContainer();
+ LOG.info("Doris container was started.");
+ }
+
+ protected static Connection getDorisQueryConnection() {
+ return dorisContainerService.getQueryConnection();
+ }
+
+ protected static Connection getDorisQueryConnection(String database) {
+ return dorisContainerService.getQueryConnection(database);
+ }
+
+ protected String getFenodes() {
+ return dorisContainerService.getFenodes();
+ }
+
+ protected String getBenodes() {
+ return dorisContainerService.getBenodes();
+ }
+
+ protected String getDorisUsername() {
+ return dorisContainerService.getUsername();
+ }
+
+ protected String getDorisPassword() {
+ return dorisContainerService.getPassword();
+ }
+
+ protected String getDorisQueryUrl() {
+ return dorisContainerService.getJdbcUrl();
+ }
+
+ protected String getDorisInstanceHost() {
+ return dorisContainerService.getInstanceHost();
+ }
+
+ public static void closeContainers() {
+ LOG.info("Starting to close containers.");
+ closeDorisContainer();
+ }
+
+ private static void closeDorisContainer() {
+ if (Objects.isNull(dorisContainerService)) {
+ return;
+ }
+ dorisContainerService.close();
+ LOG.info("Doris container was closed.");
+ }
+
+ // ------------------------------------------------------------------------
+ // test utilities
+ // ------------------------------------------------------------------------
+ public static void assertEqualsInAnyOrder(List<Object> expected,
List<Object> actual) {
+ assertTrue(expected != null && actual != null);
+ assertEqualsInOrder(
+ expected.stream().sorted().collect(Collectors.toList()),
+ actual.stream().sorted().collect(Collectors.toList()));
+ }
+
+ public static void assertEqualsInOrder(List<Object> expected, List<Object>
actual) {
+ assertTrue(expected != null && actual != null);
+ assertEquals(expected.size(), actual.size());
+ assertArrayEquals(expected.toArray(new Object[0]), actual.toArray(new
Object[0]));
+ }
+}
diff --git
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/ContainerUtils.java
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/ContainerUtils.java
new file mode 100644
index 0000000..1b54c69
--- /dev/null
+++
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/ContainerUtils.java
@@ -0,0 +1,188 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.container;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.StringJoiner;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.doris.spark.exception.DorisRuntimeException;
+import org.junit.Assert;
+import org.slf4j.Logger;
+
+public class ContainerUtils {
+
+ public static void executeSQLStatement(Connection connection, Logger
logger, String... sql) {
+ if (Objects.isNull(sql) || sql.length == 0) {
+ return;
+ }
+ try (Statement statement = connection.createStatement()) {
+ for (String s : sql) {
+ if (StringUtils.isNotEmpty(s)) {
+ logger.info("start to execute sql={}", s);
+ statement.execute(s);
+ }
+ }
+ } catch (SQLException e) {
+ throw new DorisRuntimeException(e);
+ }
+ }
+
+ public static List<String> executeSQLStatement(
+ Connection connection, Logger logger, String sql, int columnSize) {
+ List<String> result = new ArrayList<>();
+ if (Objects.isNull(sql)) {
+ return result;
+ }
+ try (Statement statement = connection.createStatement()) {
+ logger.info("start to execute sql={}", sql);
+ ResultSet resultSet = statement.executeQuery(sql);
+
+ while (resultSet.next()) {
+ StringJoiner sb = new StringJoiner(",");
+ for (int i = 1; i <= columnSize; i++) {
+ Object value = resultSet.getObject(i);
+ sb.add(String.valueOf(value));
+ }
+ result.add(sb.toString());
+ }
+ return result;
+ } catch (SQLException e) {
+ throw new DorisRuntimeException(e);
+ }
+ }
+
+ public static String loadFileContent(String resourcePath) {
+ try (InputStream stream =
+
ContainerUtils.class.getClassLoader().getResourceAsStream(resourcePath)) {
+ return new BufferedReader(new
InputStreamReader(Objects.requireNonNull(stream)))
+ .lines()
+ .collect(Collectors.joining("\n"));
+ } catch (IOException e) {
+ throw new DorisRuntimeException("Failed to read " + resourcePath +
" file.", e);
+ }
+ }
+
+ public static List<String> parseFileArgs(String resourcePath) {
+ String fileContent = ContainerUtils.loadFileContent(resourcePath);
+ String[] args = fileContent.split("\n");
+ List<String> argList = new ArrayList<>();
+ for (String arg : args) {
+ String[] split = arg.trim().split("\\s+");
+ List<String> stringList =
+ Arrays.stream(split)
+ .map(ContainerUtils::removeQuotes)
+ .collect(Collectors.toList());
+ argList.addAll(stringList);
+ }
+ return argList;
+ }
+
+ private static String removeQuotes(String str) {
+ if (str == null || str.length() < 2) {
+ return str;
+ }
+ if (str.startsWith("\"") && str.endsWith("\"")) {
+ return str.substring(1, str.length() - 1);
+ }
+ if (str.startsWith("\\'") && str.endsWith("\\'")) {
+ return str.substring(1, str.length() - 1);
+ }
+ return str;
+ }
+
+ public static String[] parseFileContentSQL(String resourcePath) {
+ String fileContent = loadFileContent(resourcePath);
+ return
Arrays.stream(fileContent.split(";")).map(String::trim).toArray(String[]::new);
+ }
+
+ public static void checkResult(
+ Connection connection,
+ Logger logger,
+ List<String> expected,
+ String query,
+ int columnSize) {
+ checkResult(connection, logger, expected, query, columnSize, true);
+ }
+
+ public static void checkResult(
+ Connection connection,
+ Logger logger,
+ List<String> expected,
+ String query,
+ int columnSize,
+ boolean ordered) {
+ List<String> actual = getResult(connection, logger, expected, query,
columnSize);
+ if (ordered) {
+ Assert.assertArrayEquals(expected.toArray(), actual.toArray());
+ } else {
+ Assert.assertEquals(expected.size(), actual.size());
+ Assert.assertArrayEquals(
+ expected.stream().sorted().toArray(Object[]::new),
+ actual.stream().sorted().toArray(Object[]::new));
+ }
+ }
+
+ public static List<String> getResult(
+ Connection connection,
+ Logger logger,
+ List<String> expected,
+ String query,
+ int columnSize) {
+ List<String> actual = new ArrayList<>();
+ try (Statement statement = connection.createStatement()) {
+ ResultSet sinkResultSet = statement.executeQuery(query);
+ while (sinkResultSet.next()) {
+ List<String> row = new ArrayList<>();
+ for (int i = 1; i <= columnSize; i++) {
+ Object value = sinkResultSet.getObject(i);
+ if (value == null) {
+ row.add("null");
+ } else {
+ row.add(value.toString());
+ }
+ }
+ actual.add(StringUtils.join(row, ","));
+ }
+ } catch (SQLException e) {
+ logger.info(
+ "Failed to check query result. expected={}, actual={}",
+ String.join(",", expected),
+ String.join(",", actual),
+ e);
+ throw new DorisRuntimeException(e);
+ }
+ logger.info(
+ "checking test result. expected={}, actual={}",
+ String.join(",", expected),
+ String.join(",", actual));
+ return actual;
+ }
+}
diff --git
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/instance/ContainerService.java
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/instance/ContainerService.java
new file mode 100644
index 0000000..3ec7ee5
--- /dev/null
+++
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/instance/ContainerService.java
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.container.instance;
+
+import org.apache.doris.spark.exception.DorisRuntimeException;
+
+import java.sql.Connection;
+
+public interface ContainerService {
+ void startContainer();
+
+ default void restartContainer() {
+ throw new DorisRuntimeException("Only doris docker container can
implemented.");
+ };
+
+ boolean isRunning();
+
+ Connection getQueryConnection();
+
+ Connection getQueryConnection(String database);
+
+ String getJdbcUrl();
+
+ String getInstanceHost();
+
+ Integer getMappedPort(int originalPort);
+
+ String getUsername();
+
+ String getPassword();
+
+ String getFenodes();
+
+ String getBenodes();
+
+ void close();
+}
diff --git
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/instance/DorisContainer.java
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/instance/DorisContainer.java
new file mode 100644
index 0000000..7c9297e
--- /dev/null
+++
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/instance/DorisContainer.java
@@ -0,0 +1,276 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.container.instance;
+
+import org.apache.doris.spark.exception.DorisRuntimeException;
+
+import com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.utility.DockerLoggerFactory;
+import org.testcontainers.utility.MountableFile;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.LockSupport;
+
+public class DorisContainer implements ContainerService {
+ private static final Logger LOG =
LoggerFactory.getLogger(DorisContainer.class);
+ private static final String DEFAULT_DOCKER_IMAGE =
"apache/doris:doris-all-in-one-2.1.0";
+ private static final String DORIS_DOCKER_IMAGE =
+ System.getProperty("image") == null
+ ? DEFAULT_DOCKER_IMAGE
+ : System.getProperty("image");
+ private static final String DRIVER_JAR =
+
"https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.16/mysql-connector-java-8.0.16.jar";
+ private static final String JDBC_URL = "jdbc:mysql://%s:9030";
+ private static final String USERNAME = "root";
+ private static final String PASSWORD = "";
+ private final GenericContainer dorisContainer;
+
+ public DorisContainer() {
+ dorisContainer = createDorisContainer();
+ }
+
+ public GenericContainer createDorisContainer() {
+ LOG.info("Will create doris containers.");
+ GenericContainer container =
+ new GenericContainer<>(DORIS_DOCKER_IMAGE)
+ .withNetwork(Network.newNetwork())
+ .withNetworkAliases("DorisContainer")
+ .withPrivilegedMode(true)
+ .withLogConsumer(
+ new Slf4jLogConsumer(
+
DockerLoggerFactory.getLogger(DORIS_DOCKER_IMAGE)))
+ // use customer conf
+ .withCopyFileToContainer(
+
MountableFile.forClasspathResource("docker/doris/be.conf"),
+ "/opt/apache-doris/be/conf/be.conf")
+ .withCopyFileToContainer(
+
MountableFile.forClasspathResource("docker/doris/fe.conf"),
+ "/opt/apache-doris/fe/conf/fe.conf")
+ .withExposedPorts(8030, 9030, 8040, 9060, 9611, 9610);
+
+ container.setPortBindings(
+ Lists.newArrayList(
+ String.format("%s:%s", "8030", "8030"),
+ String.format("%s:%s", "9030", "9030"),
+ String.format("%s:%s", "9060", "9060"),
+ String.format("%s:%s", "8040", "8040"),
+ String.format("%s:%s", "9611", "9611"),
+ String.format("%s:%s", "9610", "9610")));
+ return container;
+ }
+
+ public void startContainer() {
+ try {
+ LOG.info("Starting doris containers.");
+ // singleton doris container
+ dorisContainer.start();
+ initializeJdbcConnection();
+ initializeVariables();
+ printClusterStatus();
+ } catch (Exception ex) {
+ LOG.error("Failed to start containers doris", ex);
+ throw new DorisRuntimeException("Failed to start containers
doris", ex);
+ }
+ LOG.info("Doris container started successfully.");
+ }
+
+ @Override
+ public void restartContainer() {
+ LOG.info("Restart doris container.");
+ dorisContainer
+ .getDockerClient()
+ .restartContainerCmd(dorisContainer.getContainerId())
+ .exec();
+ }
+
+ @Override
+ public boolean isRunning() {
+ return dorisContainer.isRunning();
+ }
+
+ @Override
+ public Connection getQueryConnection() {
+ return getQueryConnection("");
+ }
+
+ @Override
+ public Connection getQueryConnection(String database) {
+ LOG.info("Try to get query connection from doris.");
+ String jdbcUrl = String.format(JDBC_URL, dorisContainer.getHost());
+ jdbcUrl = jdbcUrl + "/" + database;
+ try {
+ return DriverManager.getConnection(jdbcUrl, USERNAME, PASSWORD);
+ } catch (SQLException e) {
+ LOG.info("Failed to get doris query connection. jdbcUrl={}",
jdbcUrl, e);
+ throw new DorisRuntimeException(e);
+ }
+ }
+
+ private void initializeVariables() throws Exception {
+ try (Connection connection = getQueryConnection();
+ Statement statement = connection.createStatement()) {
+ LOG.info("init doris cluster variables.");
+ // avoid arrow flight sql reading bug
+ statement.execute("SET PROPERTY FOR 'root' 'max_user_connections'
= '1024';");
+ }
+ LOG.info("Init variables successfully.");
+ }
+
+ @Override
+ public String getJdbcUrl() {
+ return String.format(JDBC_URL, dorisContainer.getHost());
+ }
+
+ @Override
+ public String getInstanceHost() {
+ return dorisContainer.getHost();
+ }
+
+ @Override
+ public Integer getMappedPort(int originalPort) {
+ return dorisContainer.getMappedPort(originalPort);
+ }
+
+ @Override
+ public String getUsername() {
+ return USERNAME;
+ }
+
+ @Override
+ public String getPassword() {
+ return PASSWORD;
+ }
+
+ @Override
+ public String getFenodes() {
+ return dorisContainer.getHost() + ":8030";
+ }
+
+ @Override
+ public String getBenodes() {
+ return dorisContainer.getHost() + ":8040";
+ }
+
+ public void close() {
+ LOG.info("Doris container is about to be close.");
+ dorisContainer.close();
+ LOG.info("Doris container closed successfully.");
+ }
+
+ private void initializeJDBCDriver() throws MalformedURLException {
+ URLClassLoader urlClassLoader =
+ new URLClassLoader(
+ new URL[] {new URL(DRIVER_JAR)},
DorisContainer.class.getClassLoader());
+ LOG.info("Try to connect to Doris.");
+ Thread.currentThread().setContextClassLoader(urlClassLoader);
+ }
+
+ private void initializeJdbcConnection() throws Exception {
+ initializeJDBCDriver();
+ try (Connection connection = getQueryConnection();
+ Statement statement = connection.createStatement()) {
+ ResultSet resultSet;
+ do {
+ LOG.info("Waiting for the Backend to start successfully.");
+ resultSet = statement.executeQuery("show backends");
+ } while (!isBeReady(resultSet, Duration.ofSeconds(1L)));
+ }
+ LOG.info("Connected to Doris successfully.");
+ }
+
+ private boolean isBeReady(ResultSet rs, Duration duration) throws
SQLException {
+ LockSupport.parkNanos(duration.toNanos());
+ if (rs.next()) {
+ String isAlive = rs.getString("Alive").trim();
+ String totalCap = rs.getString("TotalCapacity").trim();
+ return Boolean.toString(true).equalsIgnoreCase(isAlive)
+ && !"0.000".equalsIgnoreCase(totalCap);
+ }
+ return false;
+ }
+
+ private void printClusterStatus() throws Exception {
+ LOG.info("Current machine IP: {}", dorisContainer.getHost());
+ echo("sh", "-c", "cat /proc/cpuinfo | grep 'cpu cores' | uniq");
+ echo("sh", "-c", "free -h");
+ try (Connection connection =
+ DriverManager.getConnection(
+ String.format(JDBC_URL,
dorisContainer.getHost()),
+ USERNAME,
+ PASSWORD);
+ Statement statement = connection.createStatement()) {
+ ResultSet showFrontends = statement.executeQuery("show frontends");
+ LOG.info("Frontends status: {}", convertList(showFrontends));
+ ResultSet showBackends = statement.executeQuery("show backends");
+ LOG.info("Backends status: {}", convertList(showBackends));
+ }
+ }
+
+ private void echo(String... cmd) {
+ try {
+ Process p = Runtime.getRuntime().exec(cmd);
+ InputStream is = p.getInputStream();
+ BufferedReader reader = new BufferedReader(new
InputStreamReader(is));
+ String line;
+ while ((line = reader.readLine()) != null) {
+ System.out.println(line);
+ }
+ p.waitFor();
+ is.close();
+ reader.close();
+ p.destroy();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ private List<Map> convertList(ResultSet rs) throws SQLException {
+ List<Map> list = new ArrayList<>();
+ ResultSetMetaData metaData = rs.getMetaData();
+ int columnCount = metaData.getColumnCount();
+ while (rs.next()) {
+ Map<String, Object> rowData = new HashMap<>();
+ for (int i = 1; i <= columnCount; i++) {
+ rowData.put(metaData.getColumnName(i), rs.getObject(i));
+ }
+ list.add(rowData);
+ }
+ return list;
+ }
+}
diff --git
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/instance/DorisCustomerContainer.java
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/instance/DorisCustomerContainer.java
new file mode 100644
index 0000000..4ba4e74
--- /dev/null
+++
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/container/instance/DorisCustomerContainer.java
@@ -0,0 +1,138 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.container.instance;
+
+import org.apache.doris.spark.exception.DorisRuntimeException;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+/** Using a custom Doris environment */
+public class DorisCustomerContainer implements ContainerService {
+ private static final Logger LOG =
LoggerFactory.getLogger(DorisCustomerContainer.class);
+ private static final String JDBC_URL = "jdbc:mysql://%s:%s";
+
+ @Override
+ public void startContainer() {
+ LOG.info("Using doris customer containers env.");
+ checkParams();
+ if (!isRunning()) {
+ throw new DorisRuntimeException(
+ "Backend is not alive. Please check the doris cluster.");
+ }
+ }
+
+ private void checkParams() {
+ Preconditions.checkArgument(
+ System.getProperty("doris_host") != null, "doris_host is
required.");
+ Preconditions.checkArgument(
+ System.getProperty("doris_query_port") != null,
"doris_query_port is required.");
+ Preconditions.checkArgument(
+ System.getProperty("doris_http_port") != null,
"doris_http_port is required.");
+ Preconditions.checkArgument(
+ System.getProperty("doris_user") != null, "doris_user is
required.");
+ Preconditions.checkArgument(
+ System.getProperty("doris_passwd") != null, "doris_passwd is
required.");
+ }
+
+ @Override
+ public boolean isRunning() {
+ try (Connection conn = getQueryConnection();
+ Statement stmt = conn.createStatement()) {
+ ResultSet showBackends = stmt.executeQuery("show backends");
+ while (showBackends.next()) {
+ String isAlive = showBackends.getString("Alive").trim();
+ if (Boolean.toString(true).equalsIgnoreCase(isAlive)) {
+ return true;
+ }
+ }
+ } catch (SQLException e) {
+ LOG.error("Failed to connect doris cluster.", e);
+ return false;
+ }
+ return false;
+ }
+
+ @Override
+ public Connection getQueryConnection() {
+ return getQueryConnection("");
+ }
+
+ @Override
+ public Connection getQueryConnection(String database) {
+ LOG.info("Try to get query connection from doris.");
+ String jdbcUrl =
+ String.format(
+ JDBC_URL,
+ System.getProperty("doris_host"),
+ System.getProperty("doris_query_port"));
+ jdbcUrl = jdbcUrl + "/" + database;
+ try {
+ return DriverManager.getConnection(jdbcUrl, getUsername(),
getPassword());
+ } catch (SQLException e) {
+ LOG.info("Failed to get doris query connection. jdbcUrl={}",
jdbcUrl, e);
+ throw new DorisRuntimeException(e);
+ }
+ }
+
+ @Override
+ public String getJdbcUrl() {
+ return String.format(
+ JDBC_URL, System.getProperty("doris_host"),
System.getProperty("doris_query_port"));
+ }
+
+ @Override
+ public String getInstanceHost() {
+ return System.getProperty("doris_host");
+ }
+
+ @Override
+ public Integer getMappedPort(int originalPort) {
+ return originalPort;
+ }
+
+ @Override
+ public String getUsername() {
+ return System.getProperty("doris_user");
+ }
+
+ @Override
+ public String getPassword() {
+ return System.getProperty("doris_passwd");
+ }
+
+ @Override
+ public String getFenodes() {
+ return System.getProperty("doris_host") + ":" +
System.getProperty("doris_http_port");
+ }
+
+ @Override
+ public String getBenodes() {
+ return null;
+ }
+
+ @Override
+ public void close() {}
+}
diff --git
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala
index 2d7930a..67a0688 100644
---
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala
+++
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisReaderITCase.scala
@@ -17,34 +17,66 @@
package org.apache.doris.spark.sql
-import org.apache.doris.spark.{DorisTestBase, sparkContextFunctions}
-import org.apache.spark.sql.SparkSession
+import
org.apache.doris.spark.container.AbstractContainerTestBase.getDorisQueryConnection
+import org.apache.doris.spark.container.{AbstractContainerTestBase,
ContainerUtils}
+import org.apache.doris.spark.rest.models.DataModel
+import org.apache.doris.spark.sparkContextFunctions
+import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
-import org.junit.Test
+import org.junit.Assert.fail
+import org.junit.{Before, Test}
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.slf4j.LoggerFactory
-import java.sql.Statement
+import java.sql.{Date, Timestamp}
-class DorisReaderITCase extends DorisTestBase {
+object DorisReaderITCase {
+ @Parameterized.Parameters(name = "readMode: {0}, flightSqlPort: {1}")
+ def parameters(): java.util.Collection[Array[AnyRef]] = {
+ import java.util.Arrays
+ Arrays.asList(
+ Array("thrift": java.lang.String, -1: java.lang.Integer),
+ Array("arrow": java.lang.String, 9611: java.lang.Integer)
+ )
+ }
+}
+
+@RunWith(classOf[Parameterized])
+class DorisReaderITCase(readMode: String, flightSqlPort: Int) extends
AbstractContainerTestBase {
- val DATABASE: String = "test"
- val TABLE_READ: String = "tbl_read"
- val TABLE_READ_TBL: String = "tbl_read_tbl"
+ private val LOG = LoggerFactory.getLogger(classOf[DorisReaderITCase])
+
+ val DATABASE = "test_doris_read"
+ val TABLE_READ = "tbl_read"
+ val TABLE_READ_TBL = "tbl_read_tbl"
+ val TABLE_READ_TBL_ALL_TYPES = "tbl_read_tbl_all_types"
+ val TABLE_READ_TBL_BIT_MAP = "tbl_read_tbl_bitmap"
+
+ @Before
+ def setUp(): Unit = {
+ ContainerUtils.executeSQLStatement(getDorisQueryConnection,
+ LOG,
+ String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE))
+ }
@Test
@throws[Exception]
def testRddSource(): Unit = {
- initializeTable(TABLE_READ)
- val sparkConf: SparkConf = new
SparkConf().setMaster("local[*]").setAppName("rddSource")
+ initializeTable(TABLE_READ, DataModel.DUPLICATE)
+ val sparkConf = new
SparkConf().setMaster("local[*]").setAppName("rddSource")
val sc = new SparkContext(sparkConf)
// sc.setLogLevel("DEBUG")
val dorisSparkRDD = sc.dorisRDD(
tableIdentifier = Some(DATABASE + "." + TABLE_READ),
cfg = Some(Map(
- "doris.fenodes" -> DorisTestBase.getFenodes,
- "doris.request.auth.user" -> DorisTestBase.USERNAME,
- "doris.request.auth.password" -> DorisTestBase.PASSWORD,
- "doris.fe.init.fetch" -> "false"
+ "doris.fenodes" -> getFenodes,
+ "doris.request.auth.user" -> getDorisUsername,
+ "doris.request.auth.password" -> getDorisPassword,
+ "doris.fe.init.fetch" -> "false",
+ "doris.read.mode" -> readMode,
+ "doris.read.arrow-flight-sql.port" -> flightSqlPort.toString
))
)
val result = dorisSparkRDD.collect()
@@ -56,15 +88,17 @@ class DorisReaderITCase extends DorisTestBase {
@Test
@throws[Exception]
def testDataFrameSource(): Unit = {
- initializeTable(TABLE_READ_TBL)
+ initializeTable(TABLE_READ_TBL, DataModel.UNIQUE)
val session = SparkSession.builder().master("local[*]").getOrCreate()
val dorisSparkDF = session.read
.format("doris")
- .option("doris.fenodes", DorisTestBase.getFenodes)
+ .option("doris.fenodes", getFenodes)
.option("doris.table.identifier", DATABASE + "." + TABLE_READ_TBL)
- .option("doris.user", DorisTestBase.USERNAME)
- .option("doris.password", DorisTestBase.PASSWORD)
+ .option("doris.user", getDorisUsername)
+ .option("doris.password", getDorisPassword)
+ .option("doris.read.mode", readMode)
+ .option("doris.read.arrow-flight-sql.port", flightSqlPort.toString)
.load()
val result = dorisSparkDF.collect().toList.toString()
@@ -75,7 +109,7 @@ class DorisReaderITCase extends DorisTestBase {
@Test
@throws[Exception]
def testSQLSource(): Unit = {
- initializeTable(TABLE_READ_TBL)
+ initializeTable(TABLE_READ_TBL, DataModel.UNIQUE_MOR)
val session = SparkSession.builder().master("local[*]").getOrCreate()
session.sql(
s"""
@@ -83,9 +117,11 @@ class DorisReaderITCase extends DorisTestBase {
|USING doris
|OPTIONS(
| "table.identifier"="${DATABASE + "." + TABLE_READ_TBL}",
- | "fenodes"="${DorisTestBase.getFenodes}",
- | "user"="${DorisTestBase.USERNAME}",
- | "password"="${DorisTestBase.PASSWORD}"
+ | "fenodes"="${getFenodes}",
+ | "user"="${getDorisUsername}",
+ | "password"="${getDorisPassword}",
+ | "doris.read.mode"="${readMode}",
+ | "doris.read.arrow-flight-sql.port"="${flightSqlPort}"
|)
|""".stripMargin)
@@ -98,53 +134,44 @@ class DorisReaderITCase extends DorisTestBase {
assert("List([doris,18], [spark,10])".equals(result))
}
- @throws[Exception]
- private def initializeTable(table: String): Unit = {
- try {
- val statement: Statement = DorisTestBase.connection.createStatement
- try {
- statement.execute(String.format("CREATE DATABASE IF NOT EXISTS %s",
DATABASE))
- statement.execute(String.format("DROP TABLE IF EXISTS %s.%s",
DATABASE, table))
- statement.execute(String.format("CREATE TABLE %s.%s ( \n" +
- "`name` varchar(256),\n" +
- "`age` int\n" +
- ") DISTRIBUTED BY HASH(`name`) BUCKETS 1\n" +
- "PROPERTIES (\n" +
- "\"replication_num\" = \"1\"\n" +
- ")\n", DATABASE, table))
- statement.execute(String.format("insert into %s.%s values
('doris',18)", DATABASE, table))
- statement.execute(String.format("insert into %s.%s values
('spark',10)", DATABASE, table))
- } finally {
- if (statement != null) statement.close()
- }
- }
+ private def initializeTable(table: String, dataModel: DataModel): Unit = {
+ val max = if (DataModel.AGGREGATE == dataModel) "MAX" else ""
+ val morProps = if (!(DataModel.UNIQUE_MOR == dataModel)) "" else
",\"enable_unique_key_merge_on_write\" = \"false\""
+ val model = if (dataModel == DataModel.UNIQUE_MOR)
DataModel.UNIQUE.toString else dataModel.toString
+ ContainerUtils.executeSQLStatement(
+ getDorisQueryConnection,
+ LOG,
+ String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE),
+ String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table),
+ String.format("CREATE TABLE %s.%s ( \n"
+ + "`name` varchar(256),\n"
+ + "`age` int %s\n"
+ + ") "
+ + " %s KEY(`name`) "
+ + " DISTRIBUTED BY HASH(`name`) BUCKETS 1\n"
+ + "PROPERTIES ("
+ + "\"replication_num\" = \"1\"\n" + morProps + ")", DATABASE, table,
max, model),
+ String.format("insert into %s.%s values ('doris',18)", DATABASE, table),
+ String.format("insert into %s.%s values ('spark',10)", DATABASE, table))
}
- private def compareCollectResult(a1: Array[AnyRef], a2: Array[AnyRef]):
Boolean = {
- if (a1.length == a2.length) {
- for (idx <- 0 until a1.length) {
- if (!a1(idx).isInstanceOf[Array[AnyRef]] ||
!a2(idx).isInstanceOf[Array[AnyRef]]) {
- return false
- }
- val arr1 = a1(idx).asInstanceOf[Array[AnyRef]]
- val arr2 = a2(idx).asInstanceOf[Array[AnyRef]]
- if (arr1.length != arr2.length) {
- return false
- }
- for (idx2 <- 0 until arr2.length) {
- if (arr1(idx2) != arr2(idx2)) {
- return false
- }
- }
+ private def compareCollectResult(a1: Array[AnyRef], a2: Array[AnyRef]):
Boolean = if (a1.length == a2.length) {
+ for (idx <- 0 until a1.length) {
+ if (!a1(idx).isInstanceOf[Array[AnyRef]] ||
!a2(idx).isInstanceOf[Array[AnyRef]]) return false
+ val arr1 = a1(idx).asInstanceOf[Array[AnyRef]]
+ val arr2 = a2(idx).asInstanceOf[Array[AnyRef]]
+ if (arr1.length != arr2.length) return false
+ for (idx2 <- 0 until arr2.length) {
+ if (arr1(idx2) != arr2(idx2)) return false
}
- true
- } else false
- }
+ }
+ true
+ } else false
@Test
@throws[Exception]
def testSQLSourceWithCondition(): Unit = {
- initializeTable(TABLE_READ_TBL)
+ initializeTable(TABLE_READ_TBL, DataModel.AGGREGATE)
val session = SparkSession.builder().master("local[*]").getOrCreate()
session.sql(
s"""
@@ -152,9 +179,11 @@ class DorisReaderITCase extends DorisTestBase {
|USING doris
|OPTIONS(
| "table.identifier"="${DATABASE + "." + TABLE_READ_TBL}",
- | "fenodes"="${DorisTestBase.getFenodes}",
- | "user"="${DorisTestBase.USERNAME}",
- | "password"="${DorisTestBase.PASSWORD}"
+ | "fenodes"="${getFenodes}",
+ | "user"="${getDorisUsername}",
+ | "password"="${getDorisPassword}",
+ | "doris.read.mode"="${readMode}",
+ | "doris.read.arrow-flight-sql.port"="${flightSqlPort}"
|)
|""".stripMargin)
@@ -167,5 +196,232 @@ class DorisReaderITCase extends DorisTestBase {
assert("List([doris,18])".equals(result))
}
+ @Test
+ def testReadAllType(): Unit = {
+ val sourceInitSql: Array[String] =
ContainerUtils.parseFileContentSQL("container/ddl/read_all_type.sql")
+ ContainerUtils.executeSQLStatement(getDorisQueryConnection(DATABASE), LOG,
sourceInitSql: _*)
+
+ val session = SparkSession.builder().master("local[*]").getOrCreate()
+ session.sql(
+ s"""
+ |CREATE TEMPORARY VIEW test_source
+ |USING doris
+ |OPTIONS(
+ | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL_ALL_TYPES}",
+ | "fenodes"="${getFenodes}",
+ | "user"="${getDorisUsername}",
+ | "password"="${getDorisPassword}",
+ | "doris.read.mode"="${readMode}",
+ | "doris.read.arrow-flight-sql.port"="${flightSqlPort}"
+ |)
+ |""".stripMargin)
+ session.sql("desc test_source").show(true);
+ val actualData = session.sql(
+ """
+ |select * from test_source order by id
+ |""".stripMargin).collect()
+ session.stop()
+
+ val expectedData = Array(
+ Row(1, true, 127, 32767, 2147483647, 9223372036854775807L,
"170141183460469231731687303715884105727",
+ 3.14f, 2.71828, new java.math.BigDecimal("12345.6789"),
Date.valueOf("2025-03-11"), Timestamp.valueOf("2025-03-11 12:34:56"), "A",
"Hello, Doris!", "This is a string",
+ """["Alice","Bob"]""", Map("key1" -> "value1", "key2" -> "value2"),
"""{"name":"Tom","age":30}""",
+ """{"key":"value"}""", """{"type":"variant","data":123}"""),
+ Row(2, false, -128, -32768, -2147483648, -9223372036854775808L,
"-170141183460469231731687303715884105728",
+ -1.23f, 0.0001, new java.math.BigDecimal("-9999.9999"),
Date.valueOf("2024-12-25"), Timestamp.valueOf("2024-12-25 23:59:59"), "B",
"Doris Test", "Another string!",
+ """["Charlie","David"]""", Map("k1" -> "v1", "k2" -> "v2"),
"""{"name":"Jerry","age":25}""",
+ """{"status":"ok"}""", """{"data":[1,2,3]}"""),
+ Row(3, true, 0, 0, 0, 0, "0",
+ 0.0f, 0.0, new java.math.BigDecimal("0.0000"),
Date.valueOf("2023-06-15"), Timestamp.valueOf("2023-06-15 08:00:00"), "C",
"Test Doris", "Sample text",
+ """["Eve","Frank"]""", Map("alpha" -> "beta"),
"""{"name":"Alice","age":40}""",
+ """{"nested":{"key":"value"}}""", """{"variant":"test"}"""),
+ Row(4, null, null, null, null, null, null,
+ null, null, null, null, null, null, null, null,
+ null, null, null, null, null)
+ )
+
+ val differences = actualData.zip(expectedData).zipWithIndex.flatMap {
+ case ((actualRow, expectedRow), rowIndex) =>
+ actualRow.toSeq.zip(expectedRow.toSeq).zipWithIndex.collect {
+ case ((actualValue, expectedValue), colIndex)
+ if actualValue != expectedValue =>
+ s"Row $rowIndex, Column $colIndex: actual=$actualValue,
expected=$expectedValue"
+ }
+ }
+
+ if (differences.nonEmpty) {
+ fail(s"Data mismatch found:\n${differences.mkString("\n")}")
+ }
+ }
+
+ @Test
+ def testBitmapRead(): Unit = {
+ val sourceInitSql: Array[String] =
ContainerUtils.parseFileContentSQL("container/ddl/read_bitmap.sql")
+ ContainerUtils.executeSQLStatement(getDorisQueryConnection(DATABASE), LOG,
sourceInitSql: _*)
+ val session = SparkSession.builder().master("local[*]").getOrCreate()
+ session.sql(
+ s"""
+ |CREATE TEMPORARY VIEW test_source
+ |USING doris
+ |OPTIONS(
+ | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL_BIT_MAP}",
+ | "fenodes"="${getFenodes}",
+ | "user"="${getDorisUsername}",
+ | "password"="${getDorisPassword}",
+ | "doris.read.mode"="${readMode}",
+ | "doris.read.arrow-flight-sql.port"="${flightSqlPort}"
+ |)
+ |""".stripMargin)
+ session.sql("desc test_source").show(true);
+ val actualData = session.sql(
+ """
+ |select * from test_source order by hour
+ |""".stripMargin).collect()
+ session.stop()
+
+ assert("List([20200622,1,Read unsupported], [20200622,2,Read unsupported],
[20200622,3,Read unsupported])".equals(actualData.toList.toString()))
+ }
+
+ @Test
+ def testBitmapRead2String(): Unit = {
+ if(readMode.equals("thrift")){
+ return
+ }
+ val sourceInitSql: Array[String] =
ContainerUtils.parseFileContentSQL("container/ddl/read_bitmap.sql")
+ ContainerUtils.executeSQLStatement(getDorisQueryConnection(DATABASE), LOG,
sourceInitSql: _*)
+ val session = SparkSession.builder().master("local[*]").getOrCreate()
+ session.sql(
+ s"""
+ |CREATE TEMPORARY VIEW test_source
+ |USING doris
+ |OPTIONS(
+ | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL_BIT_MAP}",
+ | "fenodes"="${getFenodes}",
+ | "user"="${getDorisUsername}",
+ | "password"="${getDorisPassword}",
+ | "doris.read.mode"="${readMode}",
+ | "doris.read.arrow-flight-sql.port"="${flightSqlPort}",
+ | "doris.read.bitmap-to-string"="true"
+ |)
+ |""".stripMargin)
+ session.sql("desc test_source").show(true);
+ val actualData = session.sql(
+ """
+ |select * from test_source order by hour
+ |""".stripMargin).collect()
+ session.stop()
+
+ assert("List([20200622,1,243], [20200622,2,1,2,3,4,5,434543],
[20200622,3,287667876573])"
+ .equals(actualData.toList.toString()))
+ }
+
+ @Test
+ def testBitmapRead2Base64(): Unit = {
+ if(readMode.equals("thrift")){
+ return
+ }
+ val sourceInitSql: Array[String] =
ContainerUtils.parseFileContentSQL("container/ddl/read_bitmap.sql")
+ ContainerUtils.executeSQLStatement(getDorisQueryConnection(DATABASE), LOG,
sourceInitSql: _*)
+ val session = SparkSession.builder().master("local[*]").getOrCreate()
+ session.sql(
+ s"""
+ |CREATE TEMPORARY VIEW test_source
+ |USING doris
+ |OPTIONS(
+ | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL_BIT_MAP}",
+ | "fenodes"="${getFenodes}",
+ | "user"="${getDorisUsername}",
+ | "password"="${getDorisPassword}",
+ | "doris.read.mode"="${readMode}",
+ | "doris.read.arrow-flight-sql.port"="${flightSqlPort}",
+ | "doris.read.bitmap-to-base64"="true"
+ |)
+ |""".stripMargin)
+ session.sql("desc test_source").show(true);
+ val actualData = session.sql(
+ """
+ |select * from test_source order by hour
+ |""".stripMargin).collect()
+ session.stop()
+
+ assert("List([20200622,1,AfMAAAA=],
[20200622,2,AjswAQABAAAEAAYAAAABAAEABABvoQ==], [20200622,3,A91yV/pCAAAA])"
+ .equals(actualData.toList.toString()))
+ }
+ @Test
+ def testReadPushDownProject(): Unit = {
+ val sourceInitSql: Array[String] =
ContainerUtils.parseFileContentSQL("container/ddl/read_all_type.sql")
+ ContainerUtils.executeSQLStatement(getDorisQueryConnection(DATABASE), LOG,
sourceInitSql: _*)
+ val session = SparkSession.builder().master("local[*]").getOrCreate()
+ session.sql(
+ s"""
+ |CREATE TEMPORARY VIEW test_source
+ |USING doris
+ |OPTIONS(
+ | "table.identifier"="${DATABASE + "." + TABLE_READ_TBL_ALL_TYPES}",
+ | "fenodes"="${getFenodes}",
+ | "user"="${getDorisUsername}",
+ | "password"="${getDorisPassword}",
+ | "doris.read.mode"="${readMode}",
+ | "doris.read.arrow-flight-sql.port"="${flightSqlPort}"
+ |)
+ |""".stripMargin)
+
+ val intFilter = session.sql(
+ """
+ |select id,c1,c2 from test_source where id = 2 and c1 = false and c4
!= 3
+ |""".stripMargin).collect()
+
+ assert("List([2,false,-128])".equals(intFilter.toList.toString()))
+
+ val floatFilter = session.sql(
+ """
+ |select id,c3,c4,c7,c9 from test_source where c7 > 0 and c7 < 3.15
+ |""".stripMargin).collect()
+
+
assert("List([1,32767,2147483647,3.14,12345.6789])".equals(floatFilter.toList.toString()))
+
+ val dateFilter = session.sql(
+ """
+ |select id,c10,c11 from test_source where c10 = '2025-03-11' and c13
like 'Hello%'
+ |""".stripMargin).collect()
+
+ assert("List([1,2025-03-11,2025-03-11
12:34:56.0])".equals(dateFilter.toList.toString()))
+
+ val datetimeFilter = session.sql(
+ """
+ |select id,c11,c12 from test_source where c10 < '2025-03-11' and c11 =
'2024-12-25 23:59:59'
+ |""".stripMargin).collect()
+
+ assert("List([2,2024-12-25
23:59:59.0,B])".equals(datetimeFilter.toList.toString()))
+
+ val stringFilter = session.sql(
+ """
+ |select id,c13,c14 from test_source where c11 >= '2024-12-25 23:59:59'
and c13 = 'Hello, Doris!'
+ |""".stripMargin).collect()
+
+ assert("List([1,Hello, Doris!,This is a
string])".equals(stringFilter.toList.toString()))
+
+ val nullFilter = session.sql(
+ """
+ |select id,c13,c14 from test_source where c14 is null
+ |""".stripMargin).collect()
+
+ assert("List([4,null,null])".equals(nullFilter.toList.toString()))
+
+ val notNullFilter = session.sql(
+ """
+ |select id from test_source where c15 is not null and c12 in ('A', 'B')
+ |""".stripMargin).collect()
+
+ assert("List([1], [2])".equals(notNullFilter.toList.toString()))
+
+ val likeFilter = session.sql(
+ """
+ |select id from test_source where c19 like '%variant%' and c13 like
'Test%'
+ |""".stripMargin).collect()
+
+ assert("List([3])".equals(likeFilter.toList.toString()))
+ session.stop()
+ }
}
diff --git
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterITCase.scala
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterITCase.scala
index 6e1bd08..7f1e393 100644
---
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterITCase.scala
+++
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisWriterITCase.scala
@@ -17,15 +17,17 @@
package org.apache.doris.spark.sql
-import org.apache.doris.spark.DorisTestBase
-import org.apache.spark.sql.types.{ArrayType, DataTypes, DecimalType, MapType,
StructField, StructType}
-import org.apache.spark.sql.{Row, SaveMode, SparkSession}
+import
org.apache.doris.spark.container.AbstractContainerTestBase.{assertEqualsInAnyOrder,
getDorisQueryConnection}
+import org.apache.doris.spark.container.{AbstractContainerTestBase,
ContainerUtils}
+import org.apache.spark.sql.{SaveMode, SparkSession}
import org.junit.Test
+import org.slf4j.LoggerFactory
-import java.sql.{Date, ResultSet, Statement, Timestamp}
-import scala.collection.mutable.ListBuffer
+import java.util
+import scala.collection.JavaConverters._
+class DorisWriterITCase extends AbstractContainerTestBase {
-class DorisWriterITCase extends DorisTestBase {
+ private val LOG = LoggerFactory.getLogger(classOf[DorisReaderITCase])
val DATABASE: String = "test"
val TABLE_CSV: String = "tbl_csv"
@@ -43,10 +45,10 @@ class DorisWriterITCase extends DorisTestBase {
)).toDF("name", "age")
df.write
.format("doris")
- .option("doris.fenodes", DorisTestBase.getFenodes)
+ .option("doris.fenodes", getFenodes)
.option("doris.table.identifier", DATABASE + "." + TABLE_CSV)
- .option("user", DorisTestBase.USERNAME)
- .option("password", DorisTestBase.PASSWORD)
+ .option("user", getDorisUsername)
+ .option("password", getDorisPassword)
.option("sink.properties.column_separator", ",")
.option("sink.properties.line_delimiter", "\n")
.option("sink.properties.format", "csv")
@@ -55,9 +57,13 @@ class DorisWriterITCase extends DorisTestBase {
session.stop()
Thread.sleep(10000)
- val actual = queryResult(TABLE_CSV);
- val expected = ListBuffer(List("doris_csv", 1), List("spark_csv", 2))
- assert(expected.equals(actual))
+ val actual = ContainerUtils.executeSQLStatement(
+ getDorisQueryConnection,
+ LOG,
+ String.format("select * from %s.%s", DATABASE, TABLE_CSV),
+ 2)
+ val expected = util.Arrays.asList("doris_csv,1", "spark_csv,2")
+ checkResultInAnyOrder("testSinkCsvFormat", expected.toArray(),
actual.toArray)
}
@Test
@@ -71,10 +77,10 @@ class DorisWriterITCase extends DorisTestBase {
)).toDF("name", "age")
df.write
.format("doris")
- .option("doris.fenodes", DorisTestBase.getFenodes)
+ .option("doris.fenodes", getFenodes)
.option("doris.table.identifier", DATABASE + "." + TABLE_JSON)
- .option("user", DorisTestBase.USERNAME)
- .option("password", DorisTestBase.PASSWORD)
+ .option("user", getDorisUsername)
+ .option("password", getDorisPassword)
.option("sink.properties.read_json_by_line", "true")
.option("sink.properties.format", "json")
.option("doris.sink.auto-redirect", "false")
@@ -83,9 +89,13 @@ class DorisWriterITCase extends DorisTestBase {
session.stop()
Thread.sleep(10000)
- val actual = queryResult(TABLE_JSON);
- val expected = ListBuffer(List("doris_json", 1), List("spark_json", 2))
- assert(expected.equals(actual))
+ val actual = ContainerUtils.executeSQLStatement(
+ getDorisQueryConnection,
+ LOG,
+ String.format("select * from %s.%s", DATABASE, TABLE_JSON),
+ 2)
+ val expected = util.Arrays.asList("doris_json,1", "spark_json,2");
+ checkResultInAnyOrder("testSinkJsonFormat", expected.toArray,
actual.toArray)
}
@Test
@@ -104,9 +114,9 @@ class DorisWriterITCase extends DorisTestBase {
|USING doris
|OPTIONS(
| "table.identifier"="${DATABASE + "." + TABLE_JSON_TBL}",
- | "fenodes"="${DorisTestBase.getFenodes}",
- | "user"="${DorisTestBase.USERNAME}",
- | "password"="${DorisTestBase.PASSWORD}"
+ | "fenodes"="${getFenodes}",
+ | "user"="${getDorisUsername}",
+ | "password"="${getDorisPassword}"
|)
|""".stripMargin)
session.sql(
@@ -116,40 +126,34 @@ class DorisWriterITCase extends DorisTestBase {
session.stop()
Thread.sleep(10000)
- val actual = queryResult(TABLE_JSON_TBL);
- val expected = ListBuffer(List("doris_tbl", 1), List("spark_tbl", 2))
- assert(expected.equals(actual))
+ val actual = ContainerUtils.executeSQLStatement(
+ getDorisQueryConnection,
+ LOG,
+ String.format("select * from %s.%s", DATABASE, TABLE_JSON_TBL),
+ 2)
+ val expected = util.Arrays.asList("doris_tbl,1", "spark_tbl,2");
+ checkResultInAnyOrder("testSQLSinkFormat", expected.toArray,
actual.toArray)
}
- private def queryResult(table: String): ListBuffer[Any] = {
- val actual = new ListBuffer[Any]
- try {
- val sinkStatement: Statement = DorisTestBase.connection.createStatement
- try {
- val sinkResultSet: ResultSet =
sinkStatement.executeQuery(String.format("select name,age from %s.%s order by
1", DATABASE, table))
- while (sinkResultSet.next) {
- val row = List(sinkResultSet.getString("name"),
sinkResultSet.getInt("age"))
- actual += row
- }
- } finally if (sinkStatement != null) sinkStatement.close()
- }
- actual
- }
@throws[Exception]
private def initializeTable(table: String): Unit = {
- try {
- val statement: Statement = DorisTestBase.connection.createStatement
- try {
- statement.execute(String.format("CREATE DATABASE IF NOT EXISTS %s",
DATABASE))
- statement.execute(String.format("DROP TABLE IF EXISTS %s.%s",
DATABASE, table))
- statement.execute(String.format(
- "CREATE TABLE %s.%s ( \n" + "`name` varchar(256),\n" + "`age` int\n"
+ ") " +
+ ContainerUtils.executeSQLStatement(
+ getDorisQueryConnection,
+ LOG,
+ String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE),
+ String.format("DROP TABLE IF EXISTS %s.%s", DATABASE, table),
+ String.format(
+ "CREATE TABLE %s.%s ( \n" + "`name` varchar(256),\n" + "`age` int\n" +
") " +
"DISTRIBUTED BY HASH(`name`) BUCKETS 1\n" +
"PROPERTIES (\n" +
- "\"replication_num\" = \"1\"\n" + ")\n", DATABASE, table))
- } finally if (statement != null) statement.close()
- }
+ "\"replication_num\" = \"1\"\n" + ")\n", DATABASE, table)
+ )
+ }
+
+ private def checkResultInAnyOrder(testName: String, expected: Array[AnyRef],
actual: Array[AnyRef]): Unit = {
+ LOG.info("Checking DorisSourceITCase result. testName={}, actual={},
expected={}", testName, actual, expected)
+ assertEqualsInAnyOrder(expected.toList.asJava, actual.toList.asJava)
}
}
diff --git
a/spark-doris-connector/spark-doris-connector-it/src/test/resources/container/ddl/read_all_type.sql
b/spark-doris-connector/spark-doris-connector-it/src/test/resources/container/ddl/read_all_type.sql
new file mode 100644
index 0000000..7632e02
--- /dev/null
+++
b/spark-doris-connector/spark-doris-connector-it/src/test/resources/container/ddl/read_all_type.sql
@@ -0,0 +1,50 @@
+DROP TABLE IF EXISTS tbl_read_tbl_all_types;
+
+CREATE TABLE tbl_read_tbl_all_types (
+`id` int,
+`c1` boolean,
+`c2` tinyint,
+`c3` smallint,
+`c4` int,
+`c5` bigint,
+`c6` largeint,
+`c7` float,
+`c8` double,
+`c9` decimal(12,4),
+`c10` date,
+`c11` datetime,
+`c12` char(1),
+`c13` varchar(16),
+`c14` string,
+`c15` Array<String>,
+`c16` Map<String, String>,
+`c17` Struct<name: String, age: int>,
+`c18` JSON,
+`c19` JSON -- doris2.1.0 can not read VARIANT
+)
+DUPLICATE KEY(`id`)
+DISTRIBUTED BY HASH(`id`) BUCKETS 2
+PROPERTIES (
+"replication_num" = "1",
+"light_schema_change" = "true"
+);
+
+INSERT INTO tbl_read_tbl_all_types VALUES
+ (1, true, 127, 32767, 2147483647, 9223372036854775807,
170141183460469231731687303715884105727,
+ 3.14, 2.71828, 12345.6789, '2025-03-11', '2025-03-11 12:34:56', 'A',
'Hello, Doris!', 'This is a string',
+ ['Alice', 'Bob'], {'key1': 'value1', 'key2': 'value2'}, STRUCT('Tom',
30), '{"key": "value"}', '{"type": "variant", "data": 123}');
+
+INSERT INTO tbl_read_tbl_all_types VALUES
+ (2, false, -128, -32768, -2147483648, -9223372036854775808,
-170141183460469231731687303715884105728,
+ -1.23, 0.0001, -9999.9999, '2024-12-25', '2024-12-25 23:59:59', 'B',
'Doris Test', 'Another string!',
+ ['Charlie', 'David'], {'k1': 'v1', 'k2': 'v2'}, STRUCT('Jerry', 25),
'{"status": "ok"}', '{"data": [1, 2, 3]}' );
+
+INSERT INTO tbl_read_tbl_all_types VALUES
+ (3, true, 0, 0, 0, 0, 0,
+ 0.0, 0.0, 0.0000, '2023-06-15', '2023-06-15 08:00:00', 'C', 'Test Doris',
'Sample text',
+ ['Eve', 'Frank'], {'alpha': 'beta'}, STRUCT('Alice', 40), '{"nested":
{"key": "value"}}', '{"variant": "test"}');
+
+INSERT INTO tbl_read_tbl_all_types VALUES
+ (4, NULL, NULL, NULL, NULL, NULL, NULL,
+ NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL,
+ NULL, NULL, NULL, NULL, NULL);
\ No newline at end of file
diff --git
a/spark-doris-connector/spark-doris-connector-it/src/test/resources/container/ddl/read_bitmap.sql
b/spark-doris-connector/spark-doris-connector-it/src/test/resources/container/ddl/read_bitmap.sql
new file mode 100644
index 0000000..8e89178
--- /dev/null
+++
b/spark-doris-connector/spark-doris-connector-it/src/test/resources/container/ddl/read_bitmap.sql
@@ -0,0 +1,17 @@
+DROP TABLE IF EXISTS tbl_read_tbl_bitmap;
+
+create table tbl_read_tbl_bitmap (
+datekey int,
+hour int,
+device_id bitmap BITMAP_UNION
+)
+aggregate key (datekey, hour)
+distributed by hash(datekey, hour) buckets 1
+properties(
+ "replication_num" = "1"
+);
+
+insert into tbl_read_tbl_bitmap values
+(20200622, 1, to_bitmap(243)),
+(20200622, 2, bitmap_from_array([1,2,3,4,5,434543])),
+(20200622, 3, to_bitmap(287667876573));
\ No newline at end of file
diff --git
a/spark-doris-connector/spark-doris-connector-it/src/test/resources/docker/doris/be.conf
b/spark-doris-connector/spark-doris-connector-it/src/test/resources/docker/doris/be.conf
new file mode 100644
index 0000000..94b76e0
--- /dev/null
+++
b/spark-doris-connector/spark-doris-connector-it/src/test/resources/docker/doris/be.conf
@@ -0,0 +1,99 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+CUR_DATE=`date +%Y%m%d-%H%M%S`
+
+PPROF_TMPDIR="$DORIS_HOME/log/"
+
+JAVA_OPTS="-Xmx1024m -DlogPath=$DORIS_HOME/log/jni.log
-Xloggc:$DORIS_HOME/log/be.gc.log.$CUR_DATE
-Djavax.security.auth.useSubjectCredsOnly=false -Dsun.security.krb5.debug=true
-Dsun.java.command=DorisBE -XX:-CriticalJNINatives"
+
+# For jdk 9+, this JAVA_OPTS will be used as default JVM options
+JAVA_OPTS_FOR_JDK_9="-Xmx1024m -DlogPath=$DORIS_HOME/log/jni.log
-Xlog:gc:$DORIS_HOME/log/be.gc.log.$CUR_DATE
-Djavax.security.auth.useSubjectCredsOnly=false -Dsun.security.krb5.debug=true
-Dsun.java.command=DorisBE -XX:-CriticalJNINatives"
+
+# For jdk 17+, this JAVA_OPTS will be used as default JVM options
+JAVA_OPTS_FOR_JDK_17="-Xmx1024m -DlogPath=$DORIS_HOME/log/jni.log
-Xlog:gc:$DORIS_HOME/log/be.gc.log.$CUR_DATE
-Djavax.security.auth.useSubjectCredsOnly=false -Dsun.security.krb5.debug=true
-Dsun.java.command=DorisBE -XX:-CriticalJNINatives
--add-opens=java.base/java.net=ALL-UNNAMED"
+
+# since 1.2, the JAVA_HOME need to be set to run BE process.
+# JAVA_HOME=/path/to/jdk/
+
+#
https://github.com/apache/doris/blob/master/docs/zh-CN/community/developer-guide/debug-tool.md#jemalloc-heap-profile
+# https://jemalloc.net/jemalloc.3.html
+JEMALLOC_CONF="percpu_arena:percpu,background_thread:true,metadata_thp:auto,muzzy_decay_ms:15000,dirty_decay_ms:15000,oversize_threshold:0,prof:false,lg_prof_interval:32,lg_prof_sample:19,prof_gdump:false,prof_accum:false,prof_leak:false,prof_final:false"
+JEMALLOC_PROF_PRFIX=""
+
+# INFO, WARNING, ERROR, FATAL
+sys_log_level = INFO
+
+# ports for admin, web, heartbeat service
+be_port = 9060
+webserver_port = 8040
+heartbeat_service_port = 9050
+brpc_port = 8060
+arrow_flight_sql_port = 9610
+enable_debug_points = true
+
+# HTTPS configures
+enable_https = false
+# path of certificate in PEM format.
+ssl_certificate_path = "$DORIS_HOME/conf/cert.pem"
+# path of private key in PEM format.
+ssl_private_key_path = "$DORIS_HOME/conf/key.pem"
+
+
+# Choose one if there are more than one ip except loopback address.
+# Note that there should at most one ip match this list.
+# If no ip match this rule, will choose one randomly.
+# use CIDR format, e.g. 10.10.10.0/24 or IP format, e.g. 10.10.10.1
+# Default value is empty.
+# priority_networks = 10.10.10.0/24;192.168.0.0/16
+
+# data root path, separate by ';'
+# You can specify the storage type for each root path, HDD (cold data) or SSD
(hot data)
+# eg:
+# storage_root_path = /home/disk1/doris;/home/disk2/doris;/home/disk2/doris
+# storage_root_path =
/home/disk1/doris,medium:SSD;/home/disk2/doris,medium:SSD;/home/disk2/doris,medium:HDD
+# /home/disk2/doris,medium:HDD(default)
+#
+# you also can specify the properties by setting '<property>:<value>',
separate by ','
+# property 'medium' has a higher priority than the extension of path
+#
+# Default value is ${DORIS_HOME}/storage, you should create it by hand.
+# storage_root_path = ${DORIS_HOME}/storage
+
+# Default dirs to put jdbc drivers,default value is ${DORIS_HOME}/jdbc_drivers
+# jdbc_drivers_dir = ${DORIS_HOME}/jdbc_drivers
+
+# Advanced configurations
+# sys_log_dir = ${DORIS_HOME}/log
+# sys_log_roll_mode = SIZE-MB-1024
+# sys_log_roll_num = 10
+# sys_log_verbose_modules = *
+# log_buffer_level = -1
+# palo_cgroups
+
+# aws sdk log level
+# Off = 0,
+# Fatal = 1,
+# Error = 2,
+# Warn = 3,
+# Info = 4,
+# Debug = 5,
+# Trace = 6
+# Default to turn off aws sdk log, because aws sdk errors that need to be
cared will be output through Doris logs
+aws_log_level=0
+## If you are not running in aws cloud, you can disable EC2 metadata
+AWS_EC2_METADATA_DISABLED=true
\ No newline at end of file
diff --git
a/spark-doris-connector/spark-doris-connector-it/src/test/resources/docker/doris/fe.conf
b/spark-doris-connector/spark-doris-connector-it/src/test/resources/docker/doris/fe.conf
new file mode 100644
index 0000000..a45fb53
--- /dev/null
+++
b/spark-doris-connector/spark-doris-connector-it/src/test/resources/docker/doris/fe.conf
@@ -0,0 +1,74 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+#####################################################################
+## The uppercase properties are read and exported by bin/start_fe.sh.
+## To see all Frontend configurations,
+## see fe/src/org/apache/doris/common/Config.java
+#####################################################################
+
+CUR_DATE=`date +%Y%m%d-%H%M%S`
+
+# Log dir
+LOG_DIR = ${DORIS_HOME}/log
+
+# For jdk 17, this JAVA_OPTS will be used as default JVM options
+JAVA_OPTS_FOR_JDK_17="-Dfile.encoding=UTF-8
-Djavax.security.auth.useSubjectCredsOnly=false -Xmx8192m -Xms8192m
-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=$LOG_DIR
-Xlog:gc*,classhisto*=trace:$LOG_DIR/fe.gc.log.$CUR_DATE:time,uptime:filecount=10,filesize=50M
--add-opens=java.base/java.nio=ALL-UNNAMED --add-opens
java.base/jdk.internal.ref=ALL-UNNAMED"
+
+# Set your own JAVA_HOME
+# JAVA_HOME=/path/to/jdk/
+
+##
+## the lowercase properties are read by main program.
+##
+
+# store metadata, must be created before start FE.
+# Default value is ${DORIS_HOME}/doris-meta
+# meta_dir = ${DORIS_HOME}/doris-meta
+
+# Default dirs to put jdbc drivers,default value is ${DORIS_HOME}/jdbc_drivers
+# jdbc_drivers_dir = ${DORIS_HOME}/jdbc_drivers
+
+http_port = 8030
+rpc_port = 9020
+query_port = 9030
+edit_log_port = 9010
+arrow_flight_sql_port = 9611
+enable_debug_points = true
+arrow_flight_token_cache_size = 50
+# Choose one if there are more than one ip except loopback address.
+# Note that there should at most one ip match this list.
+# If no ip match this rule, will choose one randomly.
+# use CIDR format, e.g. 10.10.10.0/24 or IP format, e.g. 10.10.10.1
+# Default value is empty.
+# priority_networks = 10.10.10.0/24;192.168.0.0/16
+
+# Advanced configurations
+# log_roll_size_mb = 1024
+# INFO, WARN, ERROR, FATAL
+sys_log_level = INFO
+# NORMAL, BRIEF, ASYNC
+sys_log_mode = ASYNC
+# sys_log_roll_num = 10
+# sys_log_verbose_modules = org.apache.doris
+# audit_log_dir = $LOG_DIR
+# audit_log_modules = slow_query, query
+# audit_log_roll_num = 10
+# meta_delay_toleration_second = 10
+# qe_max_connection = 1024
+# qe_query_timeout_second = 300
+# qe_slow_log_ms = 5000
\ No newline at end of file
diff --git
a/spark-doris-connector/spark-doris-connector-it/src/test/resources/log4j.properties
b/spark-doris-connector/spark-doris-connector-it/src/test/resources/log4j.properties
new file mode 100644
index 0000000..ecb73d3
--- /dev/null
+++
b/spark-doris-connector/spark-doris-connector-it/src/test/resources/log4j.properties
@@ -0,0 +1,23 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=INFO, console
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c
[%t] %x - %m%n
\ No newline at end of file
diff --git
a/spark-doris-connector/spark-doris-connector-spark-3.3/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala
b/spark-doris-connector/spark-doris-connector-spark-3.3/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala
index f13830c..f7e08b9 100644
---
a/spark-doris-connector/spark-doris-connector-spark-3.3/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala
+++
b/spark-doris-connector/spark-doris-connector-spark-3.3/src/main/scala/org/apache/doris/spark/read/expression/V2ExpressionBuilder.scala
@@ -17,8 +17,10 @@
package org.apache.doris.spark.read.expression
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.connector.expressions.filter.{AlwaysFalse,
AlwaysTrue, And, Not, Or}
import org.apache.spark.sql.connector.expressions.{Expression,
GeneralScalarExpression, Literal, NamedReference}
+import org.apache.spark.sql.types.{DateType, TimestampType}
class V2ExpressionBuilder(inValueLengthLimit: Int) {
@@ -36,7 +38,7 @@ class V2ExpressionBuilder(inValueLengthLimit: Int) {
case _: AlwaysFalse => "1=0"
case expr: Expression =>
expr match {
- case literal: Literal[_] => literal.toString
+ case literal: Literal[_] => visitLiteral(literal)
case namedRef: NamedReference => namedRef.toString
case e: GeneralScalarExpression => e.name() match {
case "IN" =>
@@ -61,6 +63,17 @@ class V2ExpressionBuilder(inValueLengthLimit: Int) {
}
}
+ def visitLiteral(literal: Literal[_]): String = {
+ if (literal.value() == null) {
+ return "null"
+ }
+ literal.dataType() match {
+ case DateType =>
s"'${DateTimeUtils.toJavaDate(literal.value().asInstanceOf[Int]).toString}'"
+ case TimestampType =>
s"'${DateTimeUtils.toJavaTimestamp(literal.value().asInstanceOf[Long]).toString}'"
+ case _ => literal.toString
+ }
+ }
+
def visitStartWith(l: String, r: String): String = {
val value = r.substring(1, r.length - 1)
s"`$l` LIKE '$value%'"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]