This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 86a37dc [Fix] rename arrowwriter and arrowutil (#315)
86a37dc is described below
commit 86a37dcc93b6633b74de2413fa0606f917050bd1
Author: wudi <[email protected]>
AuthorDate: Fri May 9 09:51:39 2025 +0800
[Fix] rename arrowwriter and arrowutil (#315)
---
.../spark/client/write/StreamLoadProcessor.java | 4 +-
.../{ArrowWriter.scala => DorisArrowWriter.scala} | 98 +++++++++++-----------
.../{ArrowUtils.scala => DorisArrowUtils.scala} | 2 +-
3 files changed, 52 insertions(+), 52 deletions(-)
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/StreamLoadProcessor.java
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/StreamLoadProcessor.java
index d7e7397..a67ab28 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/StreamLoadProcessor.java
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/StreamLoadProcessor.java
@@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.execution.arrow.ArrowWriter;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
-import org.apache.spark.sql.util.ArrowUtils;
+import org.apache.spark.sql.util.DorisArrowUtils;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@@ -55,7 +55,7 @@ public class StreamLoadProcessor extends
AbstractStreamLoadProcessor<InternalRow
@Override
public byte[] toArrowFormat(List<InternalRow> rowArray) throws IOException
{
- Schema arrowSchema = ArrowUtils.toArrowSchema(schema, "UTC");
+ Schema arrowSchema = DorisArrowUtils.toArrowSchema(schema, "UTC");
VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, new
RootAllocator(Integer.MAX_VALUE));
ArrowWriter arrowWriter = ArrowWriter.create(root);
for (InternalRow row : rowArray) {
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/spark/sql/execution/arrow/DorisArrowWriter.scala
similarity index 71%
rename from
spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
rename to
spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/spark/sql/execution/arrow/DorisArrowWriter.scala
index 4ef9885..f08af42 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/spark/sql/execution/arrow/DorisArrowWriter.scala
@@ -22,65 +22,65 @@ import org.apache.arrow.vector.complex._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
import org.apache.spark.sql.types._
-import org.apache.spark.sql.util.ArrowUtils
+import org.apache.spark.sql.util.DorisArrowUtils
import scala.collection.JavaConverters._
/**
* Copied from Spark 3.1.2. To avoid the package conflicts between spark 2 and
spark 3.
*/
-object ArrowWriter {
+object DorisArrowWriter {
- def create(schema: StructType, timeZoneId: String): ArrowWriter = {
- val arrowSchema = ArrowUtils.toArrowSchema(schema, timeZoneId)
- val root = VectorSchemaRoot.create(arrowSchema, ArrowUtils.rootAllocator)
+ def create(schema: StructType, timeZoneId: String): DorisArrowWriter = {
+ val arrowSchema = DorisArrowUtils.toArrowSchema(schema, timeZoneId)
+ val root = VectorSchemaRoot.create(arrowSchema,
DorisArrowUtils.rootAllocator)
create(root)
}
- def create(root: VectorSchemaRoot): ArrowWriter = {
+ def create(root: VectorSchemaRoot): DorisArrowWriter = {
val children = root.getFieldVectors().asScala.map { vector =>
vector.allocateNew()
createFieldWriter(vector)
}
- new ArrowWriter(root, children.toArray)
+ new DorisArrowWriter(root, children.toArray)
}
- private def createFieldWriter(vector: ValueVector): ArrowFieldWriter = {
+ private def createFieldWriter(vector: ValueVector): DorisArrowFieldWriter = {
val field = vector.getField()
- (ArrowUtils.fromArrowField(field), vector) match {
- case (BooleanType, vector: BitVector) => new BooleanWriter(vector)
- case (ByteType, vector: TinyIntVector) => new ByteWriter(vector)
- case (ShortType, vector: SmallIntVector) => new ShortWriter(vector)
- case (IntegerType, vector: IntVector) => new IntegerWriter(vector)
- case (LongType, vector: BigIntVector) => new LongWriter(vector)
- case (FloatType, vector: Float4Vector) => new FloatWriter(vector)
- case (DoubleType, vector: Float8Vector) => new DoubleWriter(vector)
+ (DorisArrowUtils.fromArrowField(field), vector) match {
+ case (BooleanType, vector: BitVector) => new DorisBooleanWriter(vector)
+ case (ByteType, vector: TinyIntVector) => new DorisByteWriter(vector)
+ case (ShortType, vector: SmallIntVector) => new DorisShortWriter(vector)
+ case (IntegerType, vector: IntVector) => new DorisIntegerWriter(vector)
+ case (LongType, vector: BigIntVector) => new DorisLongWriter(vector)
+ case (FloatType, vector: Float4Vector) => new DorisFloatWriter(vector)
+ case (DoubleType, vector: Float8Vector) => new DorisDoubleWriter(vector)
case (DecimalType.Fixed(precision, scale), vector: DecimalVector) =>
- new DecimalWriter(vector, precision, scale)
- case (StringType, vector: VarCharVector) => new StringWriter(vector)
- case (BinaryType, vector: VarBinaryVector) => new BinaryWriter(vector)
- case (DateType, vector: DateDayVector) => new DateWriter(vector)
- case (TimestampType, vector: TimeStampMicroTZVector) => new
TimestampWriter(vector)
+ new DorisDecimalWriter(vector, precision, scale)
+ case (StringType, vector: VarCharVector) => new DorisStringWriter(vector)
+ case (BinaryType, vector: VarBinaryVector) => new
DorisBinaryWriter(vector)
+ case (DateType, vector: DateDayVector) => new DorisDateWriter(vector)
+ case (TimestampType, vector: TimeStampMicroTZVector) => new
DorisTimestampWriter(vector)
case (ArrayType(_, _), vector: ListVector) =>
val elementVector = createFieldWriter(vector.getDataVector())
- new ArrayWriter(vector, elementVector)
+ new DorisArrayWriter(vector, elementVector)
case (MapType(_, _, _), vector: MapVector) =>
val structVector = vector.getDataVector.asInstanceOf[StructVector]
val keyWriter =
createFieldWriter(structVector.getChild(MapVector.KEY_NAME))
val valueWriter =
createFieldWriter(structVector.getChild(MapVector.VALUE_NAME))
- new MapWriter(vector, structVector, keyWriter, valueWriter)
+ new DorisMapWriter(vector, structVector, keyWriter, valueWriter)
case (StructType(_), vector: StructVector) =>
val children = (0 until vector.size()).map { ordinal =>
createFieldWriter(vector.getChildByOrdinal(ordinal))
}
- new StructWriter(vector, children.toArray)
+ new DorisStructWriter(vector, children.toArray)
case (dt, _) =>
throw new UnsupportedOperationException(s"Unsupported data type:
${dt.catalogString}")
}
}
}
-class ArrowWriter(val root: VectorSchemaRoot, fields: Array[ArrowFieldWriter])
{
+class DorisArrowWriter(val root: VectorSchemaRoot, fields:
Array[DorisArrowFieldWriter]) {
def schema: StructType = StructType(fields.map { f =>
StructField(f.name, f.dataType, f.nullable)
@@ -109,12 +109,12 @@ class ArrowWriter(val root: VectorSchemaRoot, fields:
Array[ArrowFieldWriter]) {
}
}
-private[spark] abstract class ArrowFieldWriter {
+private[spark] abstract class DorisArrowFieldWriter {
def valueVector: ValueVector
def name: String = valueVector.getField().getName()
- def dataType: DataType = ArrowUtils.fromArrowField(valueVector.getField())
+ def dataType: DataType =
DorisArrowUtils.fromArrowField(valueVector.getField())
def nullable: Boolean = valueVector.getField().isNullable()
def setNull(): Unit
@@ -141,7 +141,7 @@ private[spark] abstract class ArrowFieldWriter {
}
}
-private[spark] class BooleanWriter(val valueVector: BitVector) extends
ArrowFieldWriter {
+private[spark] class DorisBooleanWriter(val valueVector: BitVector) extends
DorisArrowFieldWriter {
override def setNull(): Unit = {
valueVector.setNull(count)
@@ -152,7 +152,7 @@ private[spark] class BooleanWriter(val valueVector:
BitVector) extends ArrowFiel
}
}
-private[spark] class ByteWriter(val valueVector: TinyIntVector) extends
ArrowFieldWriter {
+private[spark] class DorisByteWriter(val valueVector: TinyIntVector) extends
DorisArrowFieldWriter {
override def setNull(): Unit = {
valueVector.setNull(count)
@@ -163,7 +163,7 @@ private[spark] class ByteWriter(val valueVector:
TinyIntVector) extends ArrowFie
}
}
-private[spark] class ShortWriter(val valueVector: SmallIntVector) extends
ArrowFieldWriter {
+private[spark] class DorisShortWriter(val valueVector: SmallIntVector) extends
DorisArrowFieldWriter {
override def setNull(): Unit = {
valueVector.setNull(count)
@@ -174,7 +174,7 @@ private[spark] class ShortWriter(val valueVector:
SmallIntVector) extends ArrowF
}
}
-private[spark] class IntegerWriter(val valueVector: IntVector) extends
ArrowFieldWriter {
+private[spark] class DorisIntegerWriter(val valueVector: IntVector) extends
DorisArrowFieldWriter {
override def setNull(): Unit = {
valueVector.setNull(count)
@@ -185,7 +185,7 @@ private[spark] class IntegerWriter(val valueVector:
IntVector) extends ArrowFiel
}
}
-private[spark] class LongWriter(val valueVector: BigIntVector) extends
ArrowFieldWriter {
+private[spark] class DorisLongWriter(val valueVector: BigIntVector) extends
DorisArrowFieldWriter {
override def setNull(): Unit = {
valueVector.setNull(count)
@@ -196,7 +196,7 @@ private[spark] class LongWriter(val valueVector:
BigIntVector) extends ArrowFiel
}
}
-private[spark] class FloatWriter(val valueVector: Float4Vector) extends
ArrowFieldWriter {
+private[spark] class DorisFloatWriter(val valueVector: Float4Vector) extends
DorisArrowFieldWriter {
override def setNull(): Unit = {
valueVector.setNull(count)
@@ -207,7 +207,7 @@ private[spark] class FloatWriter(val valueVector:
Float4Vector) extends ArrowFie
}
}
-private[spark] class DoubleWriter(val valueVector: Float8Vector) extends
ArrowFieldWriter {
+private[spark] class DorisDoubleWriter(val valueVector: Float8Vector) extends
DorisArrowFieldWriter {
override def setNull(): Unit = {
valueVector.setNull(count)
@@ -218,10 +218,10 @@ private[spark] class DoubleWriter(val valueVector:
Float8Vector) extends ArrowFi
}
}
-private[spark] class DecimalWriter(
+private[spark] class DorisDecimalWriter(
val valueVector: DecimalVector,
precision: Int,
- scale: Int) extends ArrowFieldWriter {
+ scale: Int) extends DorisArrowFieldWriter {
override def setNull(): Unit = {
valueVector.setNull(count)
@@ -237,7 +237,7 @@ private[spark] class DecimalWriter(
}
}
-private[spark] class StringWriter(val valueVector: VarCharVector) extends
ArrowFieldWriter {
+private[spark] class DorisStringWriter(val valueVector: VarCharVector) extends
DorisArrowFieldWriter {
override def setNull(): Unit = {
valueVector.setNull(count)
@@ -251,8 +251,8 @@ private[spark] class StringWriter(val valueVector:
VarCharVector) extends ArrowF
}
}
-private[spark] class BinaryWriter(
- val valueVector: VarBinaryVector) extends
ArrowFieldWriter {
+private[spark] class DorisBinaryWriter(
+ val valueVector: VarBinaryVector) extends
DorisArrowFieldWriter {
override def setNull(): Unit = {
valueVector.setNull(count)
@@ -264,7 +264,7 @@ private[spark] class BinaryWriter(
}
}
-private[spark] class DateWriter(val valueVector: DateDayVector) extends
ArrowFieldWriter {
+private[spark] class DorisDateWriter(val valueVector: DateDayVector) extends
DorisArrowFieldWriter {
override def setNull(): Unit = {
valueVector.setNull(count)
@@ -275,8 +275,8 @@ private[spark] class DateWriter(val valueVector:
DateDayVector) extends ArrowFie
}
}
-private[spark] class TimestampWriter(
- val valueVector: TimeStampMicroTZVector)
extends ArrowFieldWriter {
+private[spark] class DorisTimestampWriter(
+ val valueVector: TimeStampMicroTZVector)
extends DorisArrowFieldWriter {
override def setNull(): Unit = {
valueVector.setNull(count)
@@ -287,9 +287,9 @@ private[spark] class TimestampWriter(
}
}
-private[spark] class ArrayWriter(
+private[spark] class DorisArrayWriter(
val valueVector: ListVector,
- val elementWriter: ArrowFieldWriter) extends
ArrowFieldWriter {
+ val elementWriter: DorisArrowFieldWriter)
extends DorisArrowFieldWriter {
override def setNull(): Unit = {
}
@@ -316,9 +316,9 @@ private[spark] class ArrayWriter(
}
}
-private[spark] class StructWriter(
+private[spark] class DorisStructWriter(
val valueVector: StructVector,
- children: Array[ArrowFieldWriter]) extends
ArrowFieldWriter {
+ children: Array[DorisArrowFieldWriter])
extends DorisArrowFieldWriter {
override def setNull(): Unit = {
var i = 0
@@ -351,11 +351,11 @@ private[spark] class StructWriter(
}
}
-private[spark] class MapWriter(
+private[spark] class DorisMapWriter(
val valueVector: MapVector,
val structVector: StructVector,
- val keyWriter: ArrowFieldWriter,
- val valueWriter: ArrowFieldWriter) extends
ArrowFieldWriter {
+ val keyWriter: DorisArrowFieldWriter,
+ val valueWriter: DorisArrowFieldWriter)
extends DorisArrowFieldWriter {
override def setNull(): Unit = {}
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/spark/sql/util/ArrowUtils.scala
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/spark/sql/util/DorisArrowUtils.scala
similarity index 99%
rename from
spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/spark/sql/util/ArrowUtils.scala
rename to
spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/spark/sql/util/DorisArrowUtils.scala
index 48e2305..3879321 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/spark/sql/util/ArrowUtils.scala
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/spark/sql/util/DorisArrowUtils.scala
@@ -28,7 +28,7 @@ import scala.collection.JavaConverters._
/**
* Copied from Spark 3.1.2. To avoid the package conflicts between spark 2 and
spark 3.
*/
-object ArrowUtils {
+object DorisArrowUtils {
val rootAllocator = new RootAllocator(Long.MaxValue)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]