This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 5a3177b [bug] fix bitmap and hll type column access issue (#218)
5a3177b is described below
commit 5a3177be4b49afd5bb5f8f54a1344ad903c03853
Author: gnehil <[email protected]>
AuthorDate: Mon Jul 29 15:54:23 2024 +0800
[bug] fix bitmap and hll type column access issue (#218)
---
.../doris/spark/cfg/ConfigurationOptions.java | 7 --
.../org/apache/doris/spark/rest/RestService.java | 106 +++++++++++----------
.../apache/doris/spark/serialization/RowBatch.java | 17 ++--
.../apache/doris/spark/rdd/ScalaValueReader.scala | 2 +-
.../org/apache/doris/spark/sql/DorisRelation.scala | 5 +
.../org/apache/doris/spark/sql/SchemaUtils.scala | 62 ++++++++----
.../apache/doris/spark/sql/TestSchemaUtils.scala | 36 +++----
7 files changed, 125 insertions(+), 110 deletions(-)
diff --git
a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
index a0fea83..61d4563 100644
---
a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
+++
b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
@@ -93,13 +93,6 @@ public interface ConfigurationOptions {
int DORIS_SINK_BATCH_INTERVAL_MS_DEFAULT = 50;
- /**
- * set types to ignore, split by comma
- * e.g.
- * "doris.ignore-type"="bitmap,hll"
- */
- String DORIS_IGNORE_TYPE = "doris.ignore-type";
-
String DORIS_SINK_ENABLE_2PC = "doris.sink.enable-2pc";
boolean DORIS_SINK_ENABLE_2PC_DEFAULT = false;
diff --git
a/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java
b/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java
index 412b6a8..3f3516f 100644
---
a/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java
+++
b/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java
@@ -17,56 +17,21 @@
package org.apache.doris.spark.rest;
+import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_BENODES;
import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_FENODES;
-import static
org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_FILTER_QUERY;
-import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_READ_FIELD;
import static
org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_REQUEST_AUTH_PASSWORD;
import static
org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_REQUEST_AUTH_USER;
import static
org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_TABLET_SIZE;
import static
org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_DEFAULT;
import static
org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_MIN;
import static
org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_TABLE_IDENTIFIER;
-import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_BENODES;
-import static org.apache.doris.spark.util.ErrorMessages.CONNECT_FAILED_MESSAGE;
import static
org.apache.doris.spark.util.ErrorMessages.ILLEGAL_ARGUMENT_MESSAGE;
import static
org.apache.doris.spark.util.ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE;
import static
org.apache.doris.spark.util.ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE;
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.PrintWriter;
-import java.io.Serializable;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.nio.charset.StandardCharsets;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.Base64;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Set;
-import java.util.HashSet;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.core.JsonParseException;
-import com.fasterxml.jackson.databind.JsonMappingException;
-import com.fasterxml.jackson.databind.json.JsonMapper;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.doris.spark.cfg.ConfigurationOptions;
import org.apache.doris.spark.cfg.Settings;
import org.apache.doris.spark.cfg.SparkSettings;
-import org.apache.doris.spark.exception.ConnectedFailedException;
import org.apache.doris.spark.exception.DorisException;
import org.apache.doris.spark.exception.IllegalArgumentException;
import org.apache.doris.spark.exception.ShouldNeverHappenException;
@@ -76,23 +41,44 @@ import org.apache.doris.spark.rest.models.BackendV2;
import org.apache.doris.spark.rest.models.QueryPlan;
import org.apache.doris.spark.rest.models.Schema;
import org.apache.doris.spark.rest.models.Tablet;
+import org.apache.doris.spark.sql.SchemaUtils;
import org.apache.doris.spark.util.HttpUtil;
import org.apache.doris.spark.util.URLs;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpStatus;
-import org.apache.http.client.config.RequestConfig;
+import org.apache.http.StatusLine;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
-import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
-import com.google.common.annotations.VisibleForTesting;
-import scala.collection.JavaConverters;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
/**
* Service for communicate with Doris FE.
@@ -238,18 +224,33 @@ public class RestService implements Serializable {
* @throws DorisException throw when find partition failed
*/
public static List<PartitionDefinition> findPartitions(Settings cfg,
Logger logger) throws DorisException {
- String[] tableIdentifiers =
parseIdentifier(cfg.getProperty(DORIS_TABLE_IDENTIFIER), logger);
- String sql = "select " + cfg.getProperty(DORIS_READ_FIELD, "*") +
- " from `" + tableIdentifiers[0] + "`.`" + tableIdentifiers[1]
+ "`";
- if (!StringUtils.isEmpty(cfg.getProperty(DORIS_FILTER_QUERY))) {
- sql += " where " + cfg.getProperty(DORIS_FILTER_QUERY);
+ String[] tableIdentifiers =
+
parseIdentifier(cfg.getProperty(ConfigurationOptions.DORIS_TABLE_IDENTIFIER),
logger);
+ String readFields =
cfg.getProperty(ConfigurationOptions.DORIS_READ_FIELD, "*");
+ if (!"*".equals(readFields)) {
+ String[] readFieldArr = readFields.split(",");
+ String[] bitmapColumns =
cfg.getProperty(SchemaUtils.DORIS_BITMAP_COLUMNS(), "").split(",");
+ String[] hllColumns =
cfg.getProperty(SchemaUtils.DORIS_HLL_COLUMNS(), "").split(",");
+ for (int i = 0; i < readFieldArr.length; i++) {
+ String readFieldName = readFieldArr[i].replaceAll("`", "");
+ if (ArrayUtils.contains(bitmapColumns, readFieldName)
+ || ArrayUtils.contains(hllColumns, readFieldName)) {
+ readFieldArr[i] = "'READ UNSUPPORTED' AS " +
readFieldArr[i];
+ }
+ }
+ readFields = StringUtils.join(readFieldArr, ",");
+ }
+ String sql = "select " + readFields + " from `" + tableIdentifiers[0]
+ "`.`" + tableIdentifiers[1] + "`";
+ if
(!StringUtils.isEmpty(cfg.getProperty(ConfigurationOptions.DORIS_FILTER_QUERY)))
{
+ sql += " where " +
cfg.getProperty(ConfigurationOptions.DORIS_FILTER_QUERY);
}
logger.debug("Query SQL Sending to Doris FE is: '{}'.", sql);
String finalSql = sql;
String response = queryAllFrontends((SparkSettings) cfg, (frontend,
enableHttps) -> {
- HttpPost httpPost = new HttpPost(URLs.queryPlan(frontend,
tableIdentifiers[0], tableIdentifiers[1], enableHttps));
- String entity = "{\"sql\": \""+ finalSql +"\"}";
+ HttpPost httpPost =
+ new HttpPost(URLs.queryPlan(frontend, tableIdentifiers[0],
tableIdentifiers[1], enableHttps));
+ String entity = "{\"sql\": \"" + finalSql + "\"}";
logger.debug("Post body Sending to Doris FE is: '{}'.", entity);
StringEntity stringEntity = new StringEntity(entity,
StandardCharsets.UTF_8);
stringEntity.setContentEncoding("UTF-8");
@@ -630,10 +631,11 @@ public class RestService implements Serializable {
String user = settings.getProperty(DORIS_REQUEST_AUTH_USER,
"");
String password =
settings.getProperty(DORIS_REQUEST_AUTH_PASSWORD, "");
logger.info("Send request to Doris FE '{}' with user '{}'.",
request.getURI(), user);
- request.setHeader(HttpHeaders.AUTHORIZATION, "Basic " +
- Base64.getEncoder().encodeToString((user + ":" +
password).getBytes(StandardCharsets.UTF_8)));
+ request.setHeader(HttpHeaders.AUTHORIZATION, "Basic "
+ + Base64.getEncoder().encodeToString((user + ":" +
password).getBytes(StandardCharsets.UTF_8)));
CloseableHttpResponse response = client.execute(request);
- if (response.getStatusLine().getStatusCode() ==
HttpStatus.SC_OK) {
+ StatusLine statusLine = response.getStatusLine();
+ if (statusLine.getStatusCode() == HttpStatus.SC_OK) {
String resStr = EntityUtils.toString(response.getEntity());
Map<String, Object> resMap = MAPPER.readValue(resStr,
new TypeReference<Map<String, Object>>() {
@@ -643,6 +645,8 @@ public class RestService implements Serializable {
}
return resStr;
}
+ logger.warn("Request for {} get a bad status, code: {}, msg:
{}", request.getURI().toString(),
+ statusLine.getStatusCode(),
statusLine.getReasonPhrase());
} catch (IOException e) {
logger.error("Doris FE node {} is unavailable, Request the
next Doris FE node. Err: {}", frontend, e.getMessage());
}
@@ -652,4 +656,4 @@ public class RestService implements Serializable {
throw new DorisException(errMsg);
}
-}
\ No newline at end of file
+}
diff --git
a/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java
b/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java
index b38c007..feb9a4f 100644
---
a/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java
+++
b/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java
@@ -17,6 +17,11 @@
package org.apache.doris.spark.serialization;
+import org.apache.doris.sdk.thrift.TScanBatchResult;
+import org.apache.doris.spark.exception.DorisException;
+import org.apache.doris.spark.rest.models.Schema;
+import org.apache.doris.spark.util.IPUtils;
+
import com.google.common.base.Preconditions;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.BaseIntVector;
@@ -43,10 +48,6 @@ import org.apache.arrow.vector.complex.impl.UnionMapReader;
import org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.apache.arrow.vector.types.Types;
import org.apache.commons.lang3.ArrayUtils;
-import org.apache.doris.sdk.thrift.TScanBatchResult;
-import org.apache.doris.spark.exception.DorisException;
-import org.apache.doris.spark.rest.models.Schema;
-import org.apache.doris.spark.util.IPUtils;
import org.apache.spark.sql.types.Decimal;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,8 +73,6 @@ import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
-import static org.apache.doris.spark.util.IPUtils.convertLongToIPv4Address;
-
/**
* row batch data container.
*/
@@ -157,8 +156,7 @@ public class RowBatch {
private void addValueToRow(int rowIndex, Object obj) {
if (rowIndex > rowCountInOneBatch) {
- String errMsg = "Get row offset: " + rowIndex + " larger than row
size: " +
- rowCountInOneBatch;
+ String errMsg = "Get row offset: " + rowIndex + " larger than row
size: " + rowCountInOneBatch;
logger.error(errMsg);
throw new NoSuchElementException(errMsg);
}
@@ -261,7 +259,8 @@ public class RowBatch {
ipv4Vector = (UInt4Vector) curFieldVector;
}
for (int rowIndex = 0; rowIndex < rowCountInOneBatch;
rowIndex++) {
- Object fieldValue = ipv4Vector.isNull(rowIndex) ?
null : convertLongToIPv4Address(ipv4Vector.getValueAsLong(rowIndex));
+ Object fieldValue = ipv4Vector.isNull(rowIndex) ?
null :
+
IPUtils.convertLongToIPv4Address(ipv4Vector.getValueAsLong(rowIndex));
addValueToRow(rowIndex, fieldValue);
}
break;
diff --git
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala
index 719b16b..f9124a6 100644
---
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala
+++
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala
@@ -138,7 +138,7 @@ class ScalaValueReader(partition: PartitionDefinition,
settings: Settings) exten
protected val openResult: TScanOpenResult =
lockClient(_.openScanner(openParams))
protected val contextId: String = openResult.getContextId
protected val schema: Schema =
- SchemaUtils.convertToSchema(openResult.getSelectedColumns)
+ SchemaUtils.convertToSchema(openResult.getSelectedColumns, settings)
private[this] val asyncThread: Thread = new Thread {
override def run(): Unit = {
diff --git
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala
index db546ce..aa014ff 100644
---
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala
+++
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala
@@ -70,6 +70,11 @@ private[sql] class DorisRelation(
.map(filter => s"($filter)").mkString(" and ")
}
+ val bitmapColumnStr = cfg.getProperty(SchemaUtils.DORIS_BITMAP_COLUMNS, "")
+ paramWithScan += (SchemaUtils.DORIS_BITMAP_COLUMNS -> bitmapColumnStr)
+ val hllColumnStr = cfg.getProperty(SchemaUtils.DORIS_HLL_COLUMNS, "")
+ paramWithScan += (SchemaUtils.DORIS_HLL_COLUMNS -> hllColumnStr)
+
// required columns for column pruner
if (requiredColumns != null && requiredColumns.length > 0) {
paramWithScan += (ConfigurationOptions.DORIS_READ_FIELD ->
diff --git
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
index 76d231a..e21c6f2 100644
---
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
+++
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
@@ -19,8 +19,9 @@ package org.apache.doris.spark.sql
import com.fasterxml.jackson.databind.json.JsonMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
-import org.apache.doris.sdk.thrift.TScanColumnDesc
-import org.apache.doris.spark.cfg.ConfigurationOptions.{DORIS_IGNORE_TYPE,
DORIS_READ_FIELD}
+import org.apache.commons.lang3.StringUtils
+import org.apache.doris.sdk.thrift.{TPrimitiveType, TScanColumnDesc}
+import org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_READ_FIELD
import org.apache.doris.spark.cfg.Settings
import org.apache.doris.spark.exception.DorisException
import org.apache.doris.spark.rest.RestService
@@ -38,6 +39,9 @@ private[spark] object SchemaUtils {
private val logger =
LoggerFactory.getLogger(SchemaUtils.getClass.getSimpleName.stripSuffix("$"))
private val MAPPER =
JsonMapper.builder().addModule(DefaultScalaModule).build()
+ val DORIS_BITMAP_COLUMNS = "doris.bitmap.columns"
+ val DORIS_HLL_COLUMNS = "doris.hll.columns"
+
/**
* discover Doris table schema from Doris FE.
*
@@ -46,9 +50,12 @@ private[spark] object SchemaUtils {
*/
def discoverSchema(cfg: Settings): StructType = {
val schema = discoverSchemaFromFe(cfg)
+ val bitmapColumns =
schema.getProperties.filter(_.getType.equalsIgnoreCase("BITMAP")).map(_.getName).mkString(",")
+ cfg.setProperty(DORIS_BITMAP_COLUMNS, bitmapColumns)
+ val hllColumns =
schema.getProperties.filter(_.getType.equalsIgnoreCase("HLL")).map(_.getName).mkString(",")
+ cfg.setProperty(DORIS_HLL_COLUMNS, hllColumns)
val dorisReadField = cfg.getProperty(DORIS_READ_FIELD)
- val ignoreColumnType = cfg.getProperty(DORIS_IGNORE_TYPE)
- convertToStruct(schema, dorisReadField, ignoreColumnType)
+ convertToStruct(schema, dorisReadField)
}
/**
@@ -67,20 +74,14 @@ private[spark] object SchemaUtils {
* @param schema inner schema
* @return Spark Catalyst StructType
*/
- def convertToStruct(schema: Schema, dorisReadFields: String, ignoredTypes:
String): StructType = {
+ def convertToStruct(schema: Schema, dorisReadFields: String): StructType = {
val fieldList = if (dorisReadFields != null && dorisReadFields.nonEmpty) {
dorisReadFields.split(",")
} else {
Array.empty[String]
}
- val ignoredTypeList = if (ignoredTypes != null && ignoredTypes.nonEmpty) {
- ignoredTypes.split(",").map(t => t.trim.toUpperCase)
- } else {
- Array.empty[String]
- }
val fields = schema.getProperties
- .filter(x => (fieldList.contains(x.getName) || fieldList.isEmpty)
- && !ignoredTypeList.contains(x.getType))
+ .filter(x => fieldList.contains(x.getName) || fieldList.isEmpty)
.map(f =>
DataTypes.createStructField(
f.getName,
@@ -132,8 +133,8 @@ private[spark] object SchemaUtils {
case "VARIANT" => DataTypes.StringType
case "IPV4" => DataTypes.StringType
case "IPV6" => DataTypes.StringType
- case "HLL" =>
- throw new DorisException("Unsupported type " + dorisType)
+ case "BITMAP" => DataTypes.StringType // Placeholder only, no
support for reading
+ case "HLL" => DataTypes.StringType // Placeholder only, no
support for reading
case _ =>
throw new DorisException("Unrecognized Doris type " + dorisType)
}
@@ -145,12 +146,39 @@ private[spark] object SchemaUtils {
* @param tscanColumnDescs Doris BE return schema
* @return inner schema struct
*/
- def convertToSchema(tscanColumnDescs: Seq[TScanColumnDesc]): Schema = {
- val schema = new Schema(tscanColumnDescs.length)
- tscanColumnDescs.foreach(desc => schema.put(new Field(desc.getName,
desc.getType.name, "", 0, 0, "")))
+ def convertToSchema(tscanColumnDescs: Seq[TScanColumnDesc], settings:
Settings): Schema = {
+ val readColumns = settings.getProperty(DORIS_READ_FIELD,
"").split(",").filter(_.nonEmpty).map(_.replaceAll("`", ""))
+ val bitmapColumns = settings.getProperty(DORIS_BITMAP_COLUMNS,
"").split(",").filter(_.nonEmpty)
+ val hllColumns = settings.getProperty(DORIS_HLL_COLUMNS,
"").split(",").filter(_.nonEmpty)
+ val fieldList = fieldUnion(readColumns, bitmapColumns, hllColumns,
tscanColumnDescs)
+ val schema = new Schema(fieldList.length)
+ fieldList.foreach(schema.put)
schema
}
+ private def fieldUnion(readColumns: Array[String], bitmapColumns:
Array[String], hllColumns: Array[String],
+ tScanColumnDescSeq: Seq[TScanColumnDesc]): List[Field] = {
+ val fieldList = mutable.Buffer[Field]()
+ var rcIdx = 0;
+ var tsdIdx = 0;
+ while (rcIdx < readColumns.length || tsdIdx < tScanColumnDescSeq.length) {
+ if (rcIdx < readColumns.length) {
+ if (StringUtils.equals(readColumns(rcIdx),
tScanColumnDescSeq(tsdIdx).getName)) {
+ fieldList += new Field(tScanColumnDescSeq(tsdIdx).getName,
tScanColumnDescSeq(tsdIdx).getType.name, "", 0, 0, "")
+ rcIdx += 1
+ tsdIdx += 1
+ } else if (bitmapColumns.contains(readColumns(rcIdx)) ||
hllColumns.contains(readColumns(rcIdx))) {
+ fieldList += new Field(readColumns(rcIdx),
TPrimitiveType.VARCHAR.name, "", 0, 0, "")
+ rcIdx += 1
+ }
+ } else {
+ fieldList += new Field(tScanColumnDescSeq(tsdIdx).getName,
tScanColumnDescSeq(tsdIdx).getType.name, "", 0, 0, "")
+ tsdIdx += 1
+ }
+ }
+ fieldList.toList
+ }
+
def rowColumnValue(row: SpecializedGetters, ordinal: Int, dataType:
DataType): Any = {
if (row.isNullAt(ordinal)) null
diff --git
a/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSchemaUtils.scala
b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSchemaUtils.scala
index da797e2..5da7534 100644
---
a/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSchemaUtils.scala
+++
b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSchemaUtils.scala
@@ -18,8 +18,10 @@
package org.apache.doris.spark.sql
import org.apache.doris.sdk.thrift.{TPrimitiveType, TScanColumnDesc}
+import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings}
import org.apache.doris.spark.exception.DorisException
import org.apache.doris.spark.rest.models.{Field, Schema}
+import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types._
import org.hamcrest.core.StringStartsWith.startsWith
@@ -42,7 +44,7 @@ class TestSchemaUtils extends ExpectedExceptionTest {
fields :+= DataTypes.createStructField("k1", DataTypes.ByteType, true)
fields :+= DataTypes.createStructField("k5", DataTypes.LongType, true)
val expected = DataTypes.createStructType(fields.asJava)
- Assert.assertEquals(expected, SchemaUtils.convertToStruct(schema, "k1,k5",
null))
+ Assert.assertEquals(expected, SchemaUtils.convertToStruct(schema, "k1,k5"))
}
@Test
@@ -68,10 +70,8 @@ class TestSchemaUtils extends ExpectedExceptionTest {
Assert.assertEquals(DataTypes.StringType,
SchemaUtils.getCatalystType("STRING", 0, 0))
Assert.assertEquals(DataTypes.StringType,
SchemaUtils.getCatalystType("JSON", 0, 0))
Assert.assertEquals(DataTypes.StringType,
SchemaUtils.getCatalystType("JSONB", 0, 0))
-
- thrown.expect(classOf[DorisException])
- thrown.expectMessage(startsWith("Unsupported type"))
- SchemaUtils.getCatalystType("HLL", 0, 0)
+ Assert.assertEquals(DataTypes.StringType,
SchemaUtils.getCatalystType("BITMAP", 0, 0))
+ Assert.assertEquals(DataTypes.StringType,
SchemaUtils.getCatalystType("HLL", 0, 0))
thrown.expect(classOf[DorisException])
thrown.expectMessage(startsWith("Unrecognized Doris type"))
@@ -80,6 +80,11 @@ class TestSchemaUtils extends ExpectedExceptionTest {
@Test
def testConvertToSchema(): Unit = {
+
+ val sparkConf = new SparkConf()
+ sparkConf.set(ConfigurationOptions.DORIS_READ_FIELD, "k1,k2")
+ val settings = new SparkSettings(sparkConf)
+
val k1 = new TScanColumnDesc
k1.setName("k1")
k1.setType(TPrimitiveType.BOOLEAN)
@@ -95,26 +100,7 @@ class TestSchemaUtils extends ExpectedExceptionTest {
expected.put(ek1)
expected.put(ek2)
- Assert.assertEquals(expected, SchemaUtils.convertToSchema(Seq(k1, k2)))
- }
-
- @Test
- def testIgnoreTypes(): Unit = {
-
- val schema = new Schema
- schema.setStatus(200)
- val col1 = new Field("col1", "TINYINT", "", 0, 0, "")
- val col2 = new Field("col2", "BITMAP", "", 0, 0, "")
- val col3 = new Field("col3", "HLL", "", 0, 0, "")
- schema.put(col1)
- schema.put(col2)
- schema.put(col3)
-
- var fields = List[StructField]()
- fields :+= DataTypes.createStructField("col1", DataTypes.ByteType, true)
- val expected = DataTypes.createStructType(fields.asJava)
- Assert.assertEquals(expected, SchemaUtils.convertToStruct(schema, null,
"bitmap,hll"))
-
+ Assert.assertEquals(expected, SchemaUtils.convertToSchema(Seq(k1, k2),
settings))
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]