This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 035a2e1 escape column_separator bug fixed. (#121)
035a2e1 is described below
commit 035a2e1c35ade7eb95903a337b79bd8bc71bc447
Author: benjobs <[email protected]>
AuthorDate: Wed Jul 26 17:33:20 2023 +0800
escape column_separator bug fixed. (#121)
---
.../apache/doris/spark/load/DorisStreamLoad.java | 27 ++++++++--
.../org/apache/doris/spark/util/EscapeHandler.java | 40 ---------------
.../doris/spark/rdd/AbstractDorisRDDIterator.scala | 4 +-
.../org/apache/doris/spark/rdd/ScalaDorisRDD.scala | 2 +-
.../apache/doris/spark/rdd/ScalaValueReader.scala | 58 +++++++++++-----------
.../apache/doris/spark/util/EscapeHandlerTest.java | 36 --------------
6 files changed, 55 insertions(+), 112 deletions(-)
diff --git
a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
index e1c1bc1..5341f67 100644
---
a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
+++
b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
@@ -14,7 +14,6 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-
package org.apache.doris.spark.load;
import org.apache.doris.spark.cfg.ConfigurationOptions;
@@ -23,7 +22,6 @@ import org.apache.doris.spark.exception.StreamLoadException;
import org.apache.doris.spark.rest.RestService;
import org.apache.doris.spark.rest.models.BackendV2;
import org.apache.doris.spark.rest.models.RespContent;
-import org.apache.doris.spark.util.EscapeHandler;
import org.apache.doris.spark.util.ListUtils;
import com.fasterxml.jackson.core.JsonProcessingException;
@@ -105,8 +103,8 @@ public class DorisStreamLoad implements Serializable {
.expireAfterWrite(cacheExpireTimeout, TimeUnit.MINUTES)
.build(new BackendCacheLoader(settings));
fileType = streamLoadProp.getOrDefault("format", "csv");
- if ("csv".equals(fileType)){
- FIELD_DELIMITER =
EscapeHandler.escapeString(streamLoadProp.getOrDefault("column_separator",
"\t"));
+ if ("csv".equals(fileType)) {
+ FIELD_DELIMITER =
escapeString(streamLoadProp.getOrDefault("column_separator", "\t"));
} else if ("json".equalsIgnoreCase(fileType)) {
readJsonByLine =
Boolean.parseBoolean(streamLoadProp.getOrDefault("read_json_by_line", "false"));
boolean stripOuterArray =
Boolean.parseBoolean(streamLoadProp.getOrDefault("strip_outer_array", "false"));
@@ -114,7 +112,7 @@ public class DorisStreamLoad implements Serializable {
throw new IllegalArgumentException("Only one of options
'read_json_by_line' and 'strip_outer_array' can be set to true");
}
}
- LINE_DELIMITER =
EscapeHandler.escapeString(streamLoadProp.getOrDefault("line_delimiter", "\n"));
+ LINE_DELIMITER =
escapeString(streamLoadProp.getOrDefault("line_delimiter", "\n"));
}
public String getLoadUrlStr() {
@@ -303,4 +301,23 @@ public class DorisStreamLoad implements Serializable {
}
+ private String escapeString(String hexData) {
+ if (hexData.startsWith("\\x") || hexData.startsWith("\\X")) {
+ try {
+ hexData = hexData.substring(2);
+ StringBuilder stringBuilder = new StringBuilder();
+ for (int i = 0; i < hexData.length(); i += 2) {
+ String hexByte = hexData.substring(i, i + 2);
+ int decimal = Integer.parseInt(hexByte, 16);
+ char character = (char) decimal;
+ stringBuilder.append(character);
+ }
+ return stringBuilder.toString();
+ } catch (Exception e) {
+ throw new RuntimeException("escape column_separator or
line_delimiter error.{}" , e);
+ }
+ }
+ return hexData;
+ }
+
}
diff --git
a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/EscapeHandler.java
b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/EscapeHandler.java
deleted file mode 100644
index 87a3989..0000000
---
a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/EscapeHandler.java
+++ /dev/null
@@ -1,40 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.spark.util;
-
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-public class EscapeHandler {
- public static final String ESCAPE_DELIMITERS_FLAGS = "\\x";
- public static final Pattern ESCAPE_PATTERN =
Pattern.compile("\\\\x([0-9|a-f|A-F]{2})");
-
- public static String escapeString(String source) {
- if (source.contains(ESCAPE_DELIMITERS_FLAGS)) {
- Matcher m = ESCAPE_PATTERN.matcher(source);
- StringBuffer buf = new StringBuffer();
- while (m.find()) {
- m.appendReplacement(buf, String.format("%s", (char)
Integer.parseInt(m.group(1), 16)));
- }
- m.appendTail(buf);
- return buf.toString();
- }
- return source;
- }
-
-}
\ No newline at end of file
diff --git
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/AbstractDorisRDDIterator.scala
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/AbstractDorisRDDIterator.scala
index 5b2b36f..902c634 100644
---
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/AbstractDorisRDDIterator.scala
+++
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/AbstractDorisRDDIterator.scala
@@ -33,7 +33,7 @@ private[spark] abstract class AbstractDorisRDDIterator[T](
private var closed = false
// the reader obtain data from Doris BE
- lazy val reader = {
+ private lazy val reader = {
initialized = true
val settings = partition.settings()
initReader(settings)
@@ -64,7 +64,7 @@ private[spark] abstract class AbstractDorisRDDIterator[T](
createValue(value)
}
- def closeIfNeeded(): Unit = {
+ private def closeIfNeeded(): Unit = {
logger.trace(s"Close status is '$closed' when close Doris RDD Iterator")
if (!closed) {
close()
diff --git
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaDorisRDD.scala
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaDorisRDD.scala
index e764ea0..0ff8bbd 100644
---
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaDorisRDD.scala
+++
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaDorisRDD.scala
@@ -39,7 +39,7 @@ private[spark] class ScalaDorisRDDIterator[T](
partition: PartitionDefinition)
extends AbstractDorisRDDIterator[T](context, partition) {
- override def initReader(settings: Settings) = {
+ override def initReader(settings: Settings): Unit = {
settings.setProperty(DORIS_VALUE_READER_CLASS,
classOf[ScalaValueReader].getName)
}
diff --git
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala
index 9c12cf7..719b16b 100644
---
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala
+++
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala
@@ -37,11 +37,6 @@ import org.apache.doris.spark.util.ErrorMessages
import org.apache.doris.spark.util.ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE
import org.apache.spark.internal.Logging
-import java.util.concurrent._
-import java.util.concurrent.atomic.AtomicBoolean
-import java.util.concurrent.locks.{Condition, Lock, ReentrantLock}
-import scala.collection.JavaConversions._
-import scala.util.Try
import scala.util.control.Breaks
/**
@@ -50,36 +45,45 @@ import scala.util.control.Breaks
* @param partition Doris RDD partition
* @param settings request configuration
*/
-class ScalaValueReader(partition: PartitionDefinition, settings: Settings)
extends Logging{
-
- protected val client = new BackendClient(new
Routing(partition.getBeAddress), settings)
- protected val clientLock =
- if (deserializeArrowToRowBatchAsync) new ReentrantLock()
- else new NoOpLock
- protected var offset = 0
- protected var eos: AtomicBoolean = new AtomicBoolean(false)
+class ScalaValueReader(partition: PartitionDefinition, settings: Settings)
extends Logging {
+
+ private[this] lazy val client = new BackendClient(new
Routing(partition.getBeAddress), settings)
+
+ private[this] var offset = 0
+
+ private[this] val eos: AtomicBoolean = new AtomicBoolean(false)
+
protected var rowBatch: RowBatch = _
+
// flag indicate if support deserialize Arrow to RowBatch asynchronously
- protected lazy val deserializeArrowToRowBatchAsync: Boolean = Try {
+ private[this] lazy val deserializeArrowToRowBatchAsync: Boolean = Try {
settings.getProperty(DORIS_DESERIALIZE_ARROW_ASYNC,
DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT.toString).toBoolean
} getOrElse {
- logWarning(String.format(ErrorMessages.PARSE_BOOL_FAILED_MESSAGE,
DORIS_DESERIALIZE_ARROW_ASYNC,
settings.getProperty(DORIS_DESERIALIZE_ARROW_ASYNC)))
+ logWarning(
+ String.format(ErrorMessages.PARSE_BOOL_FAILED_MESSAGE,
+ DORIS_DESERIALIZE_ARROW_ASYNC,
+ settings.getProperty(DORIS_DESERIALIZE_ARROW_ASYNC)
+ )
+ )
DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT
}
- protected var rowBatchBlockingQueue: BlockingQueue[RowBatch] = {
+ private[this] val rowBatchBlockingQueue: BlockingQueue[RowBatch] = {
val blockingQueueSize = Try {
settings.getProperty(DORIS_DESERIALIZE_QUEUE_SIZE,
DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT.toString).toInt
} getOrElse {
logWarning(String.format(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE,
DORIS_DESERIALIZE_QUEUE_SIZE,
settings.getProperty(DORIS_DESERIALIZE_QUEUE_SIZE)))
DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT
}
-
- var queue: BlockingQueue[RowBatch] = null
if (deserializeArrowToRowBatchAsync) {
- queue = new ArrayBlockingQueue(blockingQueueSize)
+ new ArrayBlockingQueue(blockingQueueSize)
+ } else {
+ null
}
- queue
+ }
+
+ private[this] val clientLock = {
+ if (deserializeArrowToRowBatchAsync) new ReentrantLock() else new NoOpLock
}
private val openParams: TScanOpenParams = {
@@ -87,7 +91,6 @@ class ScalaValueReader(partition: PartitionDefinition,
settings: Settings) exten
params.cluster = DORIS_DEFAULT_CLUSTER
params.database = partition.getDatabase
params.table = partition.getTable
-
params.tablet_ids = partition.getTabletIds.toList
params.opaqued_query_plan = partition.getQueryPlan
@@ -129,7 +132,6 @@ class ScalaValueReader(partition: PartitionDefinition,
settings: Settings) exten
s"execution memory limit: $execMemLimit, " +
s"user: ${params.getUser}, " +
s"query plan: ${params.getOpaquedQueryPlan}")
-
params
}
@@ -138,8 +140,8 @@ class ScalaValueReader(partition: PartitionDefinition,
settings: Settings) exten
protected val schema: Schema =
SchemaUtils.convertToSchema(openResult.getSelectedColumns)
- protected val asyncThread: Thread = new Thread {
- override def run {
+ private[this] val asyncThread: Thread = new Thread {
+ override def run(): Unit = {
val nextBatchParams = new TScanNextBatchParams
nextBatchParams.setContextId(contextId)
while (!eos.get) {
@@ -149,17 +151,17 @@ class ScalaValueReader(partition: PartitionDefinition,
settings: Settings) exten
if (!eos.get) {
val rowBatch = new RowBatch(nextResult, schema)
offset += rowBatch.getReadRowCount
- rowBatch.close
+ rowBatch.close()
rowBatchBlockingQueue.put(rowBatch)
}
}
}
}
- protected val asyncThreadStarted: Boolean = {
+ private val asyncThreadStarted: Boolean = {
var started = false
if (deserializeArrowToRowBatchAsync) {
- asyncThread.start
+ asyncThread.start()
started = true
}
started
@@ -197,7 +199,7 @@ class ScalaValueReader(partition: PartitionDefinition,
settings: Settings) exten
if (!eos.get && (rowBatch == null || !rowBatch.hasNext)) {
if (rowBatch != null) {
offset += rowBatch.getReadRowCount
- rowBatch.close
+ rowBatch.close()
}
val nextBatchParams = new TScanNextBatchParams
nextBatchParams.setContextId(contextId)
diff --git
a/spark-doris-connector/src/test/java/org/apache/doris/spark/util/EscapeHandlerTest.java
b/spark-doris-connector/src/test/java/org/apache/doris/spark/util/EscapeHandlerTest.java
deleted file mode 100644
index d8fb270..0000000
---
a/spark-doris-connector/src/test/java/org/apache/doris/spark/util/EscapeHandlerTest.java
+++ /dev/null
@@ -1,36 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.spark.util;
-
-import junit.framework.TestCase;
-import org.junit.Assert;
-
-import java.util.Properties;
-
-public class EscapeHandlerTest extends TestCase {
-
- public void testEscapeString() {
-
-
- String s1 = "\\x09\\x09";
- String s2 = "\\x0A\\x0A";
- Assert.assertEquals("\t\t", EscapeHandler.escapeString(s1));
- Assert.assertEquals("\n\n", EscapeHandler.escapeString(s2));
-
- }
-}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]