This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch 1.1.2-pick
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/1.1.2-pick by this push:
new c9389f06 fix
c9389f06 is described below
commit c9389f06752c47150bf7a7a9e5760e61910a8c39
Author: wudi <[email protected]>
AuthorDate: Tue Feb 10 12:15:32 2026 +0800
fix
---
flink-doris-connector/pom.xml | 2 +-
.../apache/doris/flink/serialization/RowBatch.java | 644 +++++++++++++++------
.../org/apache/doris/flink/util/FastDateUtil.java | 90 +++
3 files changed, 554 insertions(+), 182 deletions(-)
diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml
index 0847d4a0..a9737ee2 100644
--- a/flink-doris-connector/pom.xml
+++ b/flink-doris-connector/pom.xml
@@ -27,7 +27,7 @@ under the License.
</parent>
<groupId>org.apache.doris</groupId>
<artifactId>flink-doris-connector-1.14_2.12</artifactId>
- <version>1.1.2</version>
+ <version>1.1.3</version>
<name>Flink Doris Connector</name>
<url>https://doris.apache.org/</url>
<licenses>
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
index a3c4d673..88db7bfb 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
@@ -18,44 +18,62 @@
package org.apache.doris.flink.serialization;
import org.apache.doris.flink.exception.DorisException;
+import org.apache.doris.flink.exception.DorisRuntimeException;
import org.apache.doris.flink.rest.models.Schema;
+import org.apache.doris.flink.util.FastDateUtil;
import org.apache.doris.sdk.thrift.TScanBatchResult;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.DateDayVector;
+import org.apache.arrow.vector.Decimal256Vector;
import org.apache.arrow.vector.DecimalVector;
import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.FixedSizeBinaryVector;
import org.apache.arrow.vector.Float4Vector;
import org.apache.arrow.vector.Float8Vector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.TimeStampVector;
import org.apache.arrow.vector.TinyIntVector;
import org.apache.arrow.vector.VarBinaryVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.complex.ListVector;
+import org.apache.arrow.vector.complex.MapVector;
+import org.apache.arrow.vector.complex.StructVector;
+import org.apache.arrow.vector.complex.impl.DateDayReaderImpl;
+import org.apache.arrow.vector.complex.impl.TimeStampMicroReaderImpl;
+import org.apache.arrow.vector.complex.impl.UnionMapReader;
+import org.apache.arrow.vector.complex.reader.FieldReader;
+import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.ipc.ArrowStreamReader;
-import org.apache.arrow.vector.types.Types;
-import org.apache.flink.util.Preconditions;
+import org.apache.arrow.vector.types.Types.MinorType;
+import org.apache.flink.annotation.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
+import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.NoSuchElementException;
-/**
- * row batch data container.
- */
+/** row batch data container. */
public class RowBatch {
- private static Logger logger = LoggerFactory.getLogger(RowBatch.class);
+ private static final Logger logger =
LoggerFactory.getLogger(RowBatch.class);
public static class Row {
- private List<Object> cols;
+ private final List<Object> cols;
Row(int colCount) {
this.cols = new ArrayList<>(colCount);
@@ -71,18 +89,24 @@ public class RowBatch {
}
// offset for iterate the rowBatch
- private int offsetInRowBatch = 0;
+ private int offsetInRowBatch;
private int rowCountInOneBatch = 0;
private int readRowCount = 0;
- private List<Row> rowBatch = new ArrayList<>();
- private final ArrowStreamReader arrowStreamReader;
+ private final List<Row> rowBatch = new ArrayList<>();
+ private final ArrowReader arrowStreamReader;
private VectorSchemaRoot root;
private List<FieldVector> fieldVectors;
private RootAllocator rootAllocator;
private final Schema schema;
-
- private final DateTimeFormatter dateTimeFormatter =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
- private final DateTimeFormatter dateFormatter =
DateTimeFormatter.ofPattern("yyyy-MM-dd");
+ private static final String DATETIME_PATTERN = "yyyy-MM-dd HH:mm:ss";
+ private static final String DATETIMEV2_PATTERN = "yyyy-MM-dd
HH:mm:ss.SSSSSS";
+ private static final String DATE_PATTERN = "yyyy-MM-dd";
+ private final DateTimeFormatter dateTimeFormatter =
+ DateTimeFormatter.ofPattern(DATETIME_PATTERN);
+ private final DateTimeFormatter dateTimeV2Formatter =
+ DateTimeFormatter.ofPattern(DATETIMEV2_PATTERN);
+ private final DateTimeFormatter dateFormatter =
DateTimeFormatter.ofPattern(DATE_PATTERN);
+ private static final ZoneId DEFAULT_ZONE_ID = ZoneId.systemDefault();
public List<Row> getRowBatch() {
return rowBatch;
@@ -91,24 +115,63 @@ public class RowBatch {
public RowBatch(TScanBatchResult nextResult, Schema schema) {
this.schema = schema;
this.rootAllocator = new RootAllocator(Integer.MAX_VALUE);
- this.arrowStreamReader = new ArrowStreamReader(
- new ByteArrayInputStream(nextResult.getRows()),
- rootAllocator
- );
+ this.arrowStreamReader =
+ new ArrowStreamReader(
+ new ByteArrayInputStream(nextResult.getRows()),
rootAllocator);
+ this.offsetInRowBatch = 0;
+ }
+
+ public RowBatch(ArrowReader nextResult, Schema schema) {
+ this.schema = schema;
+ this.arrowStreamReader = nextResult;
this.offsetInRowBatch = 0;
}
- public RowBatch readArrow() throws DorisException {
+ public RowBatch readFlightArrow() {
+ try {
+ this.root = arrowStreamReader.getVectorSchemaRoot();
+ fieldVectors = root.getFieldVectors();
+ if (fieldVectors.size() > schema.size()) {
+ logger.error(
+ "Schema size '{}' is not equal to arrow field size
'{}'.",
+ fieldVectors.size(),
+ schema.size());
+ throw new DorisException(
+ "Load Doris data failed, schema size of fetch data is
wrong.");
+ }
+ if (fieldVectors.isEmpty() || root.getRowCount() == 0) {
+ logger.debug("One batch in arrow has no data.");
+ return null;
+ }
+ rowCountInOneBatch = root.getRowCount();
+ for (int i = 0; i < rowCountInOneBatch; ++i) {
+ rowBatch.add(new RowBatch.Row(fieldVectors.size()));
+ }
+ convertArrowToRowBatch();
+ readRowCount += root.getRowCount();
+ return this;
+ } catch (DorisException e) {
+ logger.error("Read Doris Data failed because: ", e);
+ throw new DorisRuntimeException(e.getMessage());
+ } catch (IOException e) {
+ return this;
+ }
+ }
+
+ public RowBatch readArrow() {
try {
this.root = arrowStreamReader.getVectorSchemaRoot();
while (arrowStreamReader.loadNextBatch()) {
fieldVectors = root.getFieldVectors();
if (fieldVectors.size() > schema.size()) {
- logger.error("Schema size '{}' is not equal to arrow field
size '{}'.",
- fieldVectors.size(), schema.size());
- throw new DorisException("Load Doris data failed, schema
size of fetch data is wrong.");
+ logger.error(
+ "Data schema size '{}' should not be bigger than
arrow field size '{}'",
+ schema.size(),
+ fieldVectors.size());
+ throw new DorisException(
+ "Load Doris data failed, schema size of fetch data
is wrong.");
}
- if (fieldVectors.size() == 0 || root.getRowCount() == 0) {
+ if (fieldVectors.isEmpty() || root.getRowCount() == 0) {
logger.debug("One batch in arrow has no data.");
continue;
}
@@ -123,23 +186,21 @@ public class RowBatch {
return this;
} catch (Exception e) {
logger.error("Read Doris Data failed because: ", e);
- throw new DorisException(e.getMessage());
+ throw new DorisRuntimeException(e.getMessage());
} finally {
close();
}
}
public boolean hasNext() {
- if (offsetInRowBatch < readRowCount) {
- return true;
- }
- return false;
+ return offsetInRowBatch < readRowCount;
}
- private void addValueToRow(int rowIndex, Object obj) {
+ @VisibleForTesting
+ public void addValueToRow(int rowIndex, Object obj) {
if (rowIndex > rowCountInOneBatch) {
- String errMsg = "Get row offset: " + rowIndex + " larger than row
size: " +
- rowCountInOneBatch;
+ String errMsg =
+ "Get row offset: " + rowIndex + " larger than row size: "
+ rowCountInOneBatch;
logger.error(errMsg);
throw new NoSuchElementException(errMsg);
}
@@ -149,176 +210,397 @@ public class RowBatch {
public void convertArrowToRowBatch() throws DorisException {
try {
for (int col = 0; col < fieldVectors.size(); col++) {
- FieldVector curFieldVector = fieldVectors.get(col);
- Types.MinorType mt = curFieldVector.getMinorType();
-
+ FieldVector fieldVector = fieldVectors.get(col);
+ MinorType minorType = fieldVector.getMinorType();
final String currentType = schema.get(col).getType();
- switch (currentType) {
- case "NULL_TYPE":
- for (int rowIndex = 0; rowIndex < rowCountInOneBatch;
rowIndex++) {
- addValueToRow(rowIndex, null);
- }
- break;
- case "BOOLEAN":
-
Preconditions.checkArgument(mt.equals(Types.MinorType.BIT),
- typeMismatchMessage(currentType, mt));
- BitVector bitVector = (BitVector) curFieldVector;
- for (int rowIndex = 0; rowIndex < rowCountInOneBatch;
rowIndex++) {
- Object fieldValue = bitVector.isNull(rowIndex) ?
null : bitVector.get(rowIndex) != 0;
- addValueToRow(rowIndex, fieldValue);
- }
- break;
- case "TINYINT":
-
Preconditions.checkArgument(mt.equals(Types.MinorType.TINYINT),
- typeMismatchMessage(currentType, mt));
- TinyIntVector tinyIntVector = (TinyIntVector)
curFieldVector;
- for (int rowIndex = 0; rowIndex < rowCountInOneBatch;
rowIndex++) {
- Object fieldValue = tinyIntVector.isNull(rowIndex)
? null : tinyIntVector.get(rowIndex);
- addValueToRow(rowIndex, fieldValue);
- }
- break;
- case "SMALLINT":
-
Preconditions.checkArgument(mt.equals(Types.MinorType.SMALLINT),
- typeMismatchMessage(currentType, mt));
- SmallIntVector smallIntVector = (SmallIntVector)
curFieldVector;
- for (int rowIndex = 0; rowIndex < rowCountInOneBatch;
rowIndex++) {
- Object fieldValue =
smallIntVector.isNull(rowIndex) ? null : smallIntVector.get(rowIndex);
- addValueToRow(rowIndex, fieldValue);
- }
- break;
- case "INT":
-
Preconditions.checkArgument(mt.equals(Types.MinorType.INT),
- typeMismatchMessage(currentType, mt));
- IntVector intVector = (IntVector) curFieldVector;
- for (int rowIndex = 0; rowIndex < rowCountInOneBatch;
rowIndex++) {
- Object fieldValue = intVector.isNull(rowIndex) ?
null : intVector.get(rowIndex);
- addValueToRow(rowIndex, fieldValue);
- }
- break;
- case "BIGINT":
+ final String colName = schema.get(col).getName();
+ for (int rowIndex = 0; rowIndex < rowCountInOneBatch;
rowIndex++) {
+ boolean passed = doConvert(col, rowIndex, minorType,
currentType, fieldVector);
+ if (!passed) {
+ throw new IllegalArgumentException(
+ "FLINK type is "
+ + currentType
+ + ", but arrow type is "
+ + minorType.name()
+ + ", column Name is "
+ + colName);
+ }
+ }
+ }
+ } catch (Exception e) {
+ close();
+ throw e;
+ }
+ }
-
Preconditions.checkArgument(mt.equals(Types.MinorType.BIGINT),
- typeMismatchMessage(currentType, mt));
- BigIntVector bigIntVector = (BigIntVector)
curFieldVector;
- for (int rowIndex = 0; rowIndex < rowCountInOneBatch;
rowIndex++) {
- Object fieldValue = bigIntVector.isNull(rowIndex)
? null : bigIntVector.get(rowIndex);
- addValueToRow(rowIndex, fieldValue);
- }
- break;
- case "FLOAT":
-
Preconditions.checkArgument(mt.equals(Types.MinorType.FLOAT4),
- typeMismatchMessage(currentType, mt));
- Float4Vector float4Vector = (Float4Vector)
curFieldVector;
- for (int rowIndex = 0; rowIndex < rowCountInOneBatch;
rowIndex++) {
- Object fieldValue = float4Vector.isNull(rowIndex)
? null : float4Vector.get(rowIndex);
- addValueToRow(rowIndex, fieldValue);
- }
- break;
- case "TIME":
- case "DOUBLE":
-
Preconditions.checkArgument(mt.equals(Types.MinorType.FLOAT8),
- typeMismatchMessage(currentType, mt));
- Float8Vector float8Vector = (Float8Vector)
curFieldVector;
- for (int rowIndex = 0; rowIndex < rowCountInOneBatch;
rowIndex++) {
- Object fieldValue = float8Vector.isNull(rowIndex)
? null : float8Vector.get(rowIndex);
- addValueToRow(rowIndex, fieldValue);
- }
+ @VisibleForTesting
+ public boolean doConvert(
+ int col, int rowIndex, MinorType minorType, String currentType,
FieldVector fieldVector)
+ throws DorisException {
+ switch (currentType) {
+ case "NULL_TYPE":
+ break;
+ case "BOOLEAN":
+ if (!minorType.equals(MinorType.BIT)) {
+ return false;
+ }
+ BitVector bitVector = (BitVector) fieldVector;
+ Object fieldValue =
+ bitVector.isNull(rowIndex) ? null :
bitVector.get(rowIndex) != 0;
+ addValueToRow(rowIndex, fieldValue);
+ break;
+ case "TINYINT":
+ if (!minorType.equals(MinorType.TINYINT)) {
+ return false;
+ }
+ TinyIntVector tinyIntVector = (TinyIntVector) fieldVector;
+ fieldValue = tinyIntVector.isNull(rowIndex) ? null :
tinyIntVector.get(rowIndex);
+ addValueToRow(rowIndex, fieldValue);
+ break;
+ case "SMALLINT":
+ if (!minorType.equals(MinorType.SMALLINT)) {
+ return false;
+ }
+ SmallIntVector smallIntVector = (SmallIntVector) fieldVector;
+ fieldValue = smallIntVector.isNull(rowIndex) ? null :
smallIntVector.get(rowIndex);
+ addValueToRow(rowIndex, fieldValue);
+ break;
+ case "INT":
+ if (!minorType.equals(MinorType.INT)) {
+ return false;
+ }
+ IntVector intVector = (IntVector) fieldVector;
+ fieldValue = intVector.isNull(rowIndex) ? null :
intVector.get(rowIndex);
+ addValueToRow(rowIndex, fieldValue);
+ break;
+ case "BIGINT":
+ if (!minorType.equals(MinorType.BIGINT)) {
+ return false;
+ }
+ BigIntVector bigIntVector = (BigIntVector) fieldVector;
+ fieldValue = bigIntVector.isNull(rowIndex) ? null :
bigIntVector.get(rowIndex);
+ addValueToRow(rowIndex, fieldValue);
+ break;
+ case "FLOAT":
+ if (!minorType.equals(MinorType.FLOAT4)) {
+ return false;
+ }
+ Float4Vector float4Vector = (Float4Vector) fieldVector;
+ fieldValue = float4Vector.isNull(rowIndex) ? null :
float4Vector.get(rowIndex);
+ addValueToRow(rowIndex, fieldValue);
+ break;
+ case "TIME":
+ case "DOUBLE":
+ if (!minorType.equals(MinorType.FLOAT8)) {
+ return false;
+ }
+ Float8Vector float8Vector = (Float8Vector) fieldVector;
+ fieldValue = float8Vector.isNull(rowIndex) ? null :
float8Vector.get(rowIndex);
+ addValueToRow(rowIndex, fieldValue);
+ break;
+ case "BINARY":
+ if (!minorType.equals(MinorType.VARBINARY)) {
+ return false;
+ }
+ VarBinaryVector varBinaryVector = (VarBinaryVector)
fieldVector;
+ fieldValue =
+ varBinaryVector.isNull(rowIndex) ? null :
varBinaryVector.get(rowIndex);
+ addValueToRow(rowIndex, fieldValue);
+ break;
+ case "DECIMAL":
+ case "DECIMALV2":
+ case "DECIMAL32":
+ case "DECIMAL64":
+ case "DECIMAL128I":
+ case "DECIMAL128":
+ if (!minorType.equals(MinorType.DECIMAL)) {
+ return false;
+ }
+ DecimalVector decimalVector = (DecimalVector) fieldVector;
+ if (decimalVector.isNull(rowIndex)) {
+ addValueToRow(rowIndex, null);
+ break;
+ }
+ BigDecimal value =
decimalVector.getObject(rowIndex).stripTrailingZeros();
+ addValueToRow(rowIndex, value);
+ break;
+ case "DECIMAL256":
+ if (!minorType.equals(MinorType.DECIMAL256)) {
+ return false;
+ }
+ Decimal256Vector decimal256Vector = (Decimal256Vector)
fieldVector;
+ if (decimal256Vector.isNull(rowIndex)) {
+ addValueToRow(rowIndex, null);
+ break;
+ }
+ BigDecimal value256 =
decimal256Vector.getObject(rowIndex).stripTrailingZeros();
+ addValueToRow(rowIndex, value256);
+ break;
+ case "DATE":
+ case "DATEV2":
+ if (!minorType.equals(MinorType.DATEDAY) &&
!minorType.equals(MinorType.VARCHAR)) {
+ return false;
+ }
+ if (minorType.equals(MinorType.VARCHAR)) {
+ VarCharVector date = (VarCharVector) fieldVector;
+ if (date.isNull(rowIndex)) {
+ addValueToRow(rowIndex, null);
break;
- case "BINARY":
-
Preconditions.checkArgument(mt.equals(Types.MinorType.VARBINARY),
- typeMismatchMessage(currentType, mt));
- VarBinaryVector varBinaryVector = (VarBinaryVector)
curFieldVector;
- for (int rowIndex = 0; rowIndex < rowCountInOneBatch;
rowIndex++) {
- Object fieldValue =
varBinaryVector.isNull(rowIndex) ? null : varBinaryVector.get(rowIndex);
- addValueToRow(rowIndex, fieldValue);
- }
+ }
+ String stringValue = new String(date.get(rowIndex),
StandardCharsets.UTF_8);
+ LocalDate localDate =
FastDateUtil.fastParseDate(stringValue, DATE_PATTERN);
+ addValueToRow(rowIndex, localDate);
+ } else {
+ DateDayVector date = (DateDayVector) fieldVector;
+ if (date.isNull(rowIndex)) {
+ addValueToRow(rowIndex, null);
break;
- case "DECIMAL":
- case "DECIMALV2":
- case "DECIMAL32":
- case "DECIMAL64":
- case "DECIMAL128I":
- case "DECIMAL128":
-
Preconditions.checkArgument(mt.equals(Types.MinorType.DECIMAL),
- typeMismatchMessage(currentType, mt));
- DecimalVector decimalVector = (DecimalVector)
curFieldVector;
- for (int rowIndex = 0; rowIndex < rowCountInOneBatch;
rowIndex++) {
- if (decimalVector.isNull(rowIndex)) {
- addValueToRow(rowIndex, null);
- continue;
- }
- BigDecimal value =
decimalVector.getObject(rowIndex).stripTrailingZeros();
- addValueToRow(rowIndex, value);
- }
+ }
+ LocalDate localDate =
LocalDate.ofEpochDay(date.get(rowIndex));
+ addValueToRow(rowIndex, localDate);
+ }
+ break;
+ case "DATETIME":
+ if (minorType.equals(MinorType.VARCHAR)) {
+ VarCharVector varCharVector = (VarCharVector) fieldVector;
+ if (varCharVector.isNull(rowIndex)) {
+ addValueToRow(rowIndex, null);
break;
- case "DATE":
-
Preconditions.checkArgument(mt.equals(Types.MinorType.VARCHAR),
- typeMismatchMessage(currentType, mt));
- VarCharVector date = (VarCharVector) curFieldVector;
- for (int rowIndex = 0; rowIndex < rowCountInOneBatch;
rowIndex++) {
- if (date.isNull(rowIndex)) {
- addValueToRow(rowIndex, null);
- continue;
- }
- String value = new String(date.get(rowIndex));
- LocalDate localDate = LocalDate.parse(value,
dateFormatter);
- addValueToRow(rowIndex, localDate);
- }
+ }
+ String stringValue =
+ new String(varCharVector.get(rowIndex),
StandardCharsets.UTF_8);
+ stringValue = completeMilliseconds(stringValue);
+ LocalDateTime parse =
+ FastDateUtil.fastParseDateTime(stringValue,
DATETIME_PATTERN);
+ addValueToRow(rowIndex, parse);
+ } else if (fieldVector instanceof TimeStampVector) {
+ LocalDateTime dateTime = getDateTime(rowIndex,
fieldVector);
+ addValueToRow(rowIndex, dateTime);
+ } else {
+ logger.error(
+ "Unsupported type for DATETIME, minorType {},
class is {}",
+ minorType.name(),
+ fieldVector == null ? null :
fieldVector.getClass());
+ return false;
+ }
+ break;
+ case "DATETIMEV2":
+ if (minorType.equals(MinorType.VARCHAR)) {
+ VarCharVector varCharVector = (VarCharVector) fieldVector;
+ if (varCharVector.isNull(rowIndex)) {
+ addValueToRow(rowIndex, null);
break;
- case "DATETIME":
-
Preconditions.checkArgument(mt.equals(Types.MinorType.VARCHAR),
- typeMismatchMessage(currentType, mt));
- VarCharVector timeStampSecVector = (VarCharVector)
curFieldVector;
- for (int rowIndex = 0; rowIndex < rowCountInOneBatch;
rowIndex++) {
- if (timeStampSecVector.isNull(rowIndex)) {
- addValueToRow(rowIndex, null);
- continue;
- }
- String value = new
String(timeStampSecVector.get(rowIndex));
- LocalDateTime parse = LocalDateTime.parse(value,
dateTimeFormatter);
- addValueToRow(rowIndex, parse);
- }
+ }
+ String stringValue =
+ new String(varCharVector.get(rowIndex),
StandardCharsets.UTF_8);
+ stringValue = completeMilliseconds(stringValue);
+ LocalDateTime parse =
+ FastDateUtil.fastParseDateTimeV2(stringValue,
DATETIMEV2_PATTERN);
+ addValueToRow(rowIndex, parse);
+ } else if (fieldVector instanceof TimeStampVector) {
+ LocalDateTime dateTime = getDateTime(rowIndex,
fieldVector);
+ addValueToRow(rowIndex, dateTime);
+ } else {
+ logger.error(
+ "Unsupported type for DATETIMEV2, minorType {},
class is {}",
+ minorType.name(),
+ fieldVector == null ? null :
fieldVector.getClass());
+ return false;
+ }
+ break;
+ case "LARGEINT":
+ if (!minorType.equals(MinorType.FIXEDSIZEBINARY)
+ && !minorType.equals(MinorType.VARCHAR)) {
+ return false;
+ }
+ if (minorType.equals(MinorType.FIXEDSIZEBINARY)) {
+ FixedSizeBinaryVector largeIntVector =
(FixedSizeBinaryVector) fieldVector;
+ if (largeIntVector.isNull(rowIndex)) {
+ addValueToRow(rowIndex, null);
break;
- case "LARGEINT":
- case "CHAR":
- case "VARCHAR":
- case "STRING":
-
Preconditions.checkArgument(mt.equals(Types.MinorType.VARCHAR),
- typeMismatchMessage(currentType, mt));
- VarCharVector varCharVector = (VarCharVector)
curFieldVector;
- for (int rowIndex = 0; rowIndex < rowCountInOneBatch;
rowIndex++) {
- if (varCharVector.isNull(rowIndex)) {
- addValueToRow(rowIndex, null);
- continue;
- }
- String value = new
String(varCharVector.get(rowIndex));
- addValueToRow(rowIndex, value);
- }
+ }
+ byte[] bytes = largeIntVector.get(rowIndex);
+ int left = 0, right = bytes.length - 1;
+ while (left < right) {
+ byte temp = bytes[left];
+ bytes[left] = bytes[right];
+ bytes[right] = temp;
+ left++;
+ right--;
+ }
+ BigInteger largeInt = new BigInteger(bytes);
+ addValueToRow(rowIndex, largeInt);
+ break;
+ } else {
+ VarCharVector largeIntVector = (VarCharVector) fieldVector;
+ if (largeIntVector.isNull(rowIndex)) {
+ addValueToRow(rowIndex, null);
break;
- default:
- String errMsg = "Unsupported type " +
schema.get(col).getType();
- logger.error(errMsg);
- throw new DorisException(errMsg);
+ }
+ String stringValue =
+ new String(largeIntVector.get(rowIndex),
StandardCharsets.UTF_8);
+ BigInteger largeInt = new BigInteger(stringValue);
+ addValueToRow(rowIndex, largeInt);
+ break;
}
- }
- } catch (Exception e) {
- close();
- throw e;
+ case "CHAR":
+ case "VARCHAR":
+ case "STRING":
+ case "JSONB":
+ case "JSON":
+ case "VARIANT":
+ if (!minorType.equals(MinorType.VARCHAR)) {
+ return false;
+ }
+ VarCharVector varCharVector = (VarCharVector) fieldVector;
+ if (varCharVector.isNull(rowIndex)) {
+ addValueToRow(rowIndex, null);
+ break;
+ }
+ String stringValue =
+ new String(varCharVector.get(rowIndex),
StandardCharsets.UTF_8);
+ addValueToRow(rowIndex, stringValue);
+ break;
+ case "ARRAY":
+ if (!minorType.equals(MinorType.LIST)) {
+ return false;
+ }
+ ListVector listVector = (ListVector) fieldVector;
+ Object listValue =
+ listVector.isNull(rowIndex) ? null :
listVector.getObject(rowIndex);
+ // todo: when the subtype of array is date, conversion is
required
+ addValueToRow(rowIndex, listValue);
+ break;
+ case "MAP":
+ if (!minorType.equals(MinorType.MAP)) {
+ return false;
+ }
+ MapVector mapVector = (MapVector) fieldVector;
+ UnionMapReader reader = mapVector.getReader();
+ if (mapVector.isNull(rowIndex)) {
+ addValueToRow(rowIndex, null);
+ break;
+ }
+ reader.setPosition(rowIndex);
+ Map<String, Object> mapValue = new HashMap<>();
+ while (reader.next()) {
+ FieldReader keyReader = reader.key();
+ FieldReader valueReader = reader.value();
+ Object mapKeyObj = handleMapFieldReader(keyReader);
+ Object mapValueObj = handleMapFieldReader(valueReader);
+ mapValue.put(mapKeyObj.toString(), mapValueObj);
+ }
+ addValueToRow(rowIndex, mapValue);
+ break;
+ case "STRUCT":
+ if (!minorType.equals(MinorType.STRUCT)) {
+ return false;
+ }
+ StructVector structVector = (StructVector) fieldVector;
+ if (structVector.isNull(rowIndex)) {
+ addValueToRow(rowIndex, null);
+ break;
+ }
+ Map<String, ?> structValue = structVector.getObject(rowIndex);
+ addValueToRow(rowIndex, structValue);
+ break;
+ default:
+ String errMsg = "Unsupported type " +
schema.get(col).getType();
+ logger.error(errMsg);
+ throw new DorisException(errMsg);
+ }
+ return true;
+ }
+
+ private Object handleMapFieldReader(FieldReader reader) {
+ if (reader instanceof TimeStampMicroReaderImpl) {
+ return longToLocalDateTime(reader.readLong());
+ }
+ if (reader instanceof DateDayReaderImpl) {
+ return LocalDate.ofEpochDay(((Integer)
reader.readObject()).longValue());
+ }
+ return reader.readObject();
+ }
+
+ @VisibleForTesting
+ public LocalDateTime getDateTime(int rowIndex, FieldVector fieldVector) {
+ TimeStampVector vector = (TimeStampVector) fieldVector;
+ if (vector.isNull(rowIndex)) {
+ return null;
+ }
+ // todo: Currently, the scale of doris's arrow datetimev2 is hardcoded
to 6,
+ // and there is also a time zone problem in arrow, so use timestamp to
convert first
+ long time = vector.get(rowIndex);
+ return longToLocalDateTime(time);
+ }
+
+ @VisibleForTesting
+ public static LocalDateTime longToLocalDateTime(long time) {
+ Instant instant;
+ // Determine the timestamp accuracy and process it
+ if (time < 10_000_000_000L) { // Second timestamp
+ instant = Instant.ofEpochSecond(time);
+ } else if (time < 10_000_000_000_000L) { // milli second
+ instant = Instant.ofEpochMilli(time);
+ } else { // micro second
+ instant = Instant.ofEpochSecond(time / 1_000_000, (time %
1_000_000) * 1_000);
+ }
+ return LocalDateTime.ofInstant(instant, DEFAULT_ZONE_ID);
+ }
+
+ /**
+ * use case when to replace while
"Benchmark","Mode","Threads","Samples","Score","Score Error.
+ * (99.9%)","Unit" "CaseWhenTest", "thrpt", 1, 5, 40657433.897696,
2515802.067503,"ops/s"
+ * "WhileTest", "thrpt", 1, 5, 9708130.819491, 1207453.635429,"ops/s"
+ *
+ * @param stringValue
+ * @return
+ */
+ @VisibleForTesting
+ public static String completeMilliseconds(String stringValue) {
+ if (stringValue.length() == DATETIMEV2_PATTERN.length()) {
+ return stringValue;
+ }
+
+ if (stringValue.length() < DATETIME_PATTERN.length()) {
+ return stringValue;
+ }
+
+ if (stringValue.length() == DATETIME_PATTERN.length()) {
+ stringValue += ".";
+ }
+ int s = DATETIMEV2_PATTERN.length() - stringValue.length();
+ switch (s) {
+ case 1:
+ return stringValue + "0";
+ case 2:
+ return stringValue + "00";
+ case 3:
+ return stringValue + "000";
+ case 4:
+ return stringValue + "0000";
+ case 5:
+ return stringValue + "00000";
+ case 6:
+ return stringValue + "000000";
+ default:
+ return stringValue;
}
}
- public List<Object> next() throws DorisException {
+ public List<Object> next() {
if (!hasNext()) {
- String errMsg = "Get row offset:" + offsetInRowBatch + " larger
than row size: " + readRowCount;
+ String errMsg =
+ "Get row offset:" + offsetInRowBatch + " larger than row
size: " + readRowCount;
logger.error(errMsg);
throw new NoSuchElementException(errMsg);
}
return rowBatch.get(offsetInRowBatch++).getCols();
}
- private String typeMismatchMessage(final String sparkType, final
Types.MinorType arrowType) {
+ private String typeMismatchMessage(final String flinkType, final MinorType
arrowType) {
final String messageTemplate = "FLINK type is %1$s, but arrow type is
%2$s.";
- return String.format(messageTemplate, sparkType, arrowType.name());
+ return String.format(messageTemplate, flinkType, arrowType.name());
}
public int getReadRowCount() {
@@ -337,4 +619,4 @@ public class RowBatch {
// do nothing
}
}
-}
+}
\ No newline at end of file
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/util/FastDateUtil.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/util/FastDateUtil.java
new file mode 100644
index 00000000..e1e7fd0d
--- /dev/null
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/util/FastDateUtil.java
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.util;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+
+/**
+ * idea for this util is from https://bugs.openjdk.org/browse/JDK-8144808
991ms.
+ * LocalDateTime.parse(...) 246ms : LocalDateTime.of(...)
+ */
+public final class FastDateUtil {
+
+ public static LocalDateTime fastParseDateTimeV2(String dateTime, String
pattern) {
+ char[] arr = dateTime.toCharArray();
+ int[] indexes =
+ new int[] {
+ pattern.indexOf("yyyy"),
+ pattern.indexOf("MM"),
+ pattern.indexOf("dd"),
+ pattern.indexOf("HH"),
+ pattern.indexOf("mm"),
+ pattern.indexOf("ss"),
+ pattern.indexOf("SSSSSS")
+ };
+ int year = parseFromIndex(arr, indexes[0], indexes[0] + 4);
+ int month = parseFromIndex(arr, indexes[1], indexes[1] + 2);
+ int day = parseFromIndex(arr, indexes[2], indexes[2] + 2);
+ int hour = parseFromIndex(arr, indexes[3], indexes[3] + 2);
+ int minute = parseFromIndex(arr, indexes[4], indexes[4] + 2);
+ int second = parseFromIndex(arr, indexes[5], indexes[5] + 2);
+ int nanos = parseFromIndex(arr, indexes[6], indexes[6] + 6) * 1000;
+ return LocalDateTime.of(year, month, day, hour, minute, second, nanos);
+ }
+
+ public static LocalDateTime fastParseDateTime(String dateTime, String
pattern) {
+ char[] arr = dateTime.toCharArray();
+ int[] indexes =
+ new int[] {
+ pattern.indexOf("yyyy"),
+ pattern.indexOf("MM"),
+ pattern.indexOf("dd"),
+ pattern.indexOf("HH"),
+ pattern.indexOf("mm"),
+ pattern.indexOf("ss")
+ };
+ int year = parseFromIndex(arr, indexes[0], indexes[0] + 4);
+ int month = parseFromIndex(arr, indexes[1], indexes[1] + 2);
+ int day = parseFromIndex(arr, indexes[2], indexes[2] + 2);
+ int hour = parseFromIndex(arr, indexes[3], indexes[3] + 2);
+ int minute = parseFromIndex(arr, indexes[4], indexes[4] + 2);
+ int second = parseFromIndex(arr, indexes[5], indexes[5] + 2);
+ return LocalDateTime.of(year, month, day, hour, minute, second);
+ }
+
+ public static LocalDate fastParseDate(String dateTime, String pattern) {
+ char[] arr = dateTime.toCharArray();
+ int[] indexes =
+ new int[] {
+ pattern.indexOf("yyyy"), pattern.indexOf("MM"),
pattern.indexOf("dd"),
+ };
+ int year = parseFromIndex(arr, indexes[0], indexes[0] + 4);
+ int month = parseFromIndex(arr, indexes[1], indexes[1] + 2);
+ int day = parseFromIndex(arr, indexes[2], indexes[2] + 2);
+ return LocalDate.of(year, month, day);
+ }
+
+ private static int parseFromIndex(char[] arr, int start, int end) {
+ int value = 0;
+ for (int i = start; i < end; i++) {
+ value = value * 10 + (arr[i] - '0');
+ }
+ return value;
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]