This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 27bf18e3 [Fix] fix arrow read timestamp bug (#446)
27bf18e3 is described below
commit 27bf18e39c6f47ba01bbc34adf66795c2c9aedc1
Author: wudi <[email protected]>
AuthorDate: Fri Aug 2 11:16:57 2024 +0800
[Fix] fix arrow read timestamp bug (#446)
---
.../apache/doris/flink/serialization/RowBatch.java | 30 +++---
.../doris/flink/serialization/TestRowBatch.java | 109 +++++++++++++++++++++
2 files changed, 127 insertions(+), 12 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
index 1a42b2b9..38c63b77 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
@@ -31,7 +31,7 @@ import org.apache.arrow.vector.Float4Vector;
import org.apache.arrow.vector.Float8Vector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.SmallIntVector;
-import org.apache.arrow.vector.TimeStampMicroVector;
+import org.apache.arrow.vector.TimeStampVector;
import org.apache.arrow.vector.TinyIntVector;
import org.apache.arrow.vector.UInt4Vector;
import org.apache.arrow.vector.VarBinaryVector;
@@ -334,10 +334,6 @@ public class RowBatch {
}
break;
case "DATETIME":
- if (!minorType.equals(Types.MinorType.TIMESTAMPMICRO)
- && !minorType.equals(Types.MinorType.VARCHAR)) {
- return false;
- }
if (minorType.equals(Types.MinorType.VARCHAR)) {
VarCharVector varCharVector = (VarCharVector) fieldVector;
if (varCharVector.isNull(rowIndex)) {
@@ -347,16 +343,18 @@ public class RowBatch {
String stringValue = new
String(varCharVector.get(rowIndex));
LocalDateTime parse = LocalDateTime.parse(stringValue,
dateTimeFormatter);
addValueToRow(rowIndex, parse);
- } else {
+ } else if (fieldVector instanceof TimeStampVector) {
LocalDateTime dateTime = getDateTime(rowIndex,
fieldVector);
addValueToRow(rowIndex, dateTime);
+ } else {
+ logger.error(
+ "Unsupported type for DATETIME, minorType {},
class is {}",
+ minorType.name(),
+ fieldVector == null ? null :
fieldVector.getClass());
+ return false;
}
break;
case "DATETIMEV2":
- if (!minorType.equals(Types.MinorType.TIMESTAMPMICRO)
- && !minorType.equals(Types.MinorType.VARCHAR)) {
- return false;
- }
if (minorType.equals(Types.MinorType.VARCHAR)) {
VarCharVector varCharVector = (VarCharVector) fieldVector;
if (varCharVector.isNull(rowIndex)) {
@@ -367,9 +365,15 @@ public class RowBatch {
stringValue = completeMilliseconds(stringValue);
LocalDateTime parse = LocalDateTime.parse(stringValue,
dateTimeV2Formatter);
addValueToRow(rowIndex, parse);
- } else {
+ } else if (fieldVector instanceof TimeStampVector) {
LocalDateTime dateTime = getDateTime(rowIndex,
fieldVector);
addValueToRow(rowIndex, dateTime);
+ } else {
+ logger.error(
+ "Unsupported type for DATETIMEV2, minorType {},
class is {}",
+ minorType.name(),
+ fieldVector == null ? null :
fieldVector.getClass());
+ return false;
}
break;
case "LARGEINT":
@@ -498,10 +502,12 @@ public class RowBatch {
@VisibleForTesting
public LocalDateTime getDateTime(int rowIndex, FieldVector fieldVector) {
- TimeStampMicroVector vector = (TimeStampMicroVector) fieldVector;
+ TimeStampVector vector = (TimeStampVector) fieldVector;
if (vector.isNull(rowIndex)) {
return null;
}
+ // todo: Currently, the scale of doris's arrow datetimev2 is hardcoded
to 6,
+ // and there is also a time zone problem in arrow, so use timestamp to
convert first
long time = vector.get(rowIndex);
return longToLocalDateTime(time);
}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
index e1f7b6e5..3d9ac261 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
@@ -34,6 +34,8 @@ import org.apache.arrow.vector.Float8Vector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.SmallIntVector;
import org.apache.arrow.vector.TimeStampMicroVector;
+import org.apache.arrow.vector.TimeStampMilliVector;
+import org.apache.arrow.vector.TimeStampSecVector;
import org.apache.arrow.vector.TinyIntVector;
import org.apache.arrow.vector.UInt4Vector;
import org.apache.arrow.vector.VarBinaryVector;
@@ -1331,9 +1333,23 @@ public class TestRowBatch {
flag = rowBatch.doConvert(1, 1, Types.MinorType.INT, "DATETIME", null);
Assert.assertFalse(flag);
+ flag = rowBatch.doConvert(1, 1, Types.MinorType.TIMESTAMPSEC,
"DATETIME", null);
+ Assert.assertFalse(flag);
+
+ IntVector intVector1 = new IntVector("test", new
RootAllocator(Integer.MAX_VALUE));
+ intVector1.setNull(0);
+ flag = rowBatch.doConvert(1, 1, Types.MinorType.TIMESTAMPSEC,
"DATETIME", intVector1);
+ Assert.assertFalse(flag);
+
flag = rowBatch.doConvert(1, 1, Types.MinorType.INT, "DATETIMEV2",
null);
Assert.assertFalse(flag);
+ flag = rowBatch.doConvert(1, 1, Types.MinorType.TIMESTAMPSEC,
"DATETIMEV2", null);
+ Assert.assertFalse(flag);
+
+ flag = rowBatch.doConvert(1, 1, Types.MinorType.TIMESTAMPSEC,
"DATETIMEV2", intVector1);
+ Assert.assertFalse(flag);
+
flag = rowBatch.doConvert(1, 1, Types.MinorType.INT, "LARGEINT", null);
Assert.assertFalse(flag);
@@ -1658,4 +1674,97 @@ public class TestRowBatch {
Assert.assertArrayEquals(expectArray, resultArray);
}
+
+ @Test
+ public void timestampVector() throws IOException, DorisException {
+ List<Field> childrenBuilder = new ArrayList<>();
+ childrenBuilder.add(
+ new Field(
+ "k0",
+ FieldType.nullable(new
ArrowType.Timestamp(TimeUnit.MICROSECOND, null)),
+ null));
+ childrenBuilder.add(
+ new Field(
+ "k1",
+ FieldType.nullable(new
ArrowType.Timestamp(TimeUnit.MILLISECOND, null)),
+ null));
+ childrenBuilder.add(
+ new Field(
+ "k2",
+ FieldType.nullable(new
ArrowType.Timestamp(TimeUnit.SECOND, null)),
+ null));
+
+ VectorSchemaRoot root =
+ VectorSchemaRoot.create(
+ new
org.apache.arrow.vector.types.pojo.Schema(childrenBuilder, null),
+ new RootAllocator(Integer.MAX_VALUE));
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ ArrowStreamWriter arrowStreamWriter =
+ new ArrowStreamWriter(
+ root, new DictionaryProvider.MapDictionaryProvider(),
outputStream);
+
+ arrowStreamWriter.start();
+ root.setRowCount(1);
+
+ FieldVector vector = root.getVector("k0");
+ TimeStampMicroVector mircoVec = (TimeStampMicroVector) vector;
+ mircoVec.allocateNew(1);
+ mircoVec.setIndexDefined(0);
+ mircoVec.setSafe(0, 1721892143586123L);
+ vector.setValueCount(1);
+
+ vector = root.getVector("k1");
+ TimeStampMilliVector milliVector = (TimeStampMilliVector) vector;
+ milliVector.allocateNew(1);
+ milliVector.setIndexDefined(0);
+ milliVector.setSafe(0, 1721892143586L);
+ vector.setValueCount(1);
+
+ vector = root.getVector("k2");
+ TimeStampSecVector secVector = (TimeStampSecVector) vector;
+ secVector.allocateNew(1);
+ secVector.setIndexDefined(0);
+ secVector.setSafe(0, 1721892143L);
+ vector.setValueCount(1);
+
+ arrowStreamWriter.writeBatch();
+
+ arrowStreamWriter.end();
+ arrowStreamWriter.close();
+
+ TStatus status = new TStatus();
+ status.setStatusCode(TStatusCode.OK);
+ TScanBatchResult scanBatchResult = new TScanBatchResult();
+ scanBatchResult.setStatus(status);
+ scanBatchResult.setEos(false);
+ scanBatchResult.setRows(outputStream.toByteArray());
+
+ String schemaStr =
+
"{\"properties\":[{\"type\":\"DATETIME\",\"name\":\"k0\",\"comment\":\"\"},
{\"type\":\"DATETIME\",\"name\":\"k1\",\"comment\":\"\"},
{\"type\":\"DATETIME\",\"name\":\"k2\",\"comment\":\"\"}],"
+ + "\"status\":200}";
+
+ Schema schema = RestService.parseSchema(schemaStr, logger);
+
+ RowBatch rowBatch = new RowBatch(scanBatchResult, schema).readArrow();
+ List<Object> next = rowBatch.next();
+ Assert.assertEquals(next.size(), 3);
+ Assert.assertEquals(
+ next.get(0),
+ LocalDateTime.of(2024, 7, 25, 15, 22, 23, 586123000)
+ .atZone(ZoneId.of("UTC+8"))
+ .withZoneSameInstant(ZoneId.systemDefault())
+ .toLocalDateTime());
+ Assert.assertEquals(
+ next.get(1),
+ LocalDateTime.of(2024, 7, 25, 15, 22, 23, 586000000)
+ .atZone(ZoneId.of("UTC+8"))
+ .withZoneSameInstant(ZoneId.systemDefault())
+ .toLocalDateTime());
+ Assert.assertEquals(
+ next.get(2),
+ LocalDateTime.of(2024, 7, 25, 15, 22, 23, 0)
+ .atZone(ZoneId.of("UTC+8"))
+ .withZoneSameInstant(ZoneId.systemDefault())
+ .toLocalDateTime());
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]