This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 2707f4f [improve] support read ipv4 in Doris 2.1.0 (#369)
2707f4f is described below
commit 2707f4f601bdd116e415b6df896095b5a2f894dd
Author: Petrichor <[email protected]>
AuthorDate: Wed Apr 24 22:11:16 2024 +0800
[improve] support read ipv4 in Doris 2.1.0 (#369)
---
.../apache/doris/flink/serialization/RowBatch.java | 12 ++++++++--
.../doris/flink/serialization/TestRowBatch.java | 27 ++++++++++++++++++++--
2 files changed, 35 insertions(+), 4 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
index 9e87bde..b4f35ab 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
@@ -20,6 +20,7 @@ package org.apache.doris.flink.serialization;
import org.apache.flink.util.Preconditions;
import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.BaseIntVector;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.BitVector;
import org.apache.arrow.vector.DateDayVector;
@@ -228,10 +229,17 @@ public class RowBatch {
addValueToRow(rowIndex, fieldValue);
break;
case "IPV4":
- if (!minorType.equals(Types.MinorType.UINT4)) {
+ if (!minorType.equals(Types.MinorType.UINT4)
+ && !minorType.equals(Types.MinorType.INT)) {
return false;
}
- UInt4Vector ipv4Vector = (UInt4Vector) fieldVector;
+ BaseIntVector ipv4Vector;
+ if (minorType.equals(Types.MinorType.INT)) {
+ ipv4Vector = (IntVector) fieldVector;
+
+ } else {
+ ipv4Vector = (UInt4Vector) fieldVector;
+ }
fieldValue =
ipv4Vector.isNull(rowIndex)
? null
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
index 29e48b6..d9a6422 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
@@ -1105,7 +1105,8 @@ public class TestRowBatch {
ImmutableList.Builder<Field> childrenBuilder = ImmutableList.builder();
childrenBuilder.add(
- new Field("k1", FieldType.nullable(new ArrowType.Int(32,
false)), null));
+ new Field("k1", FieldType.nullable(new ArrowType.Int(32,
false)), null),
+ new Field("k2", FieldType.nullable(new ArrowType.Int(32,
true)), null));
VectorSchemaRoot root =
VectorSchemaRoot.create(
@@ -1134,7 +1135,23 @@ public class TestRowBatch {
uInt4Vector.setSafe(3, 16777215);
uInt4Vector.setIndexDefined(4);
uInt4Vector.setWithPossibleTruncate(4, 4294967295L);
+
+ FieldVector vector1 = root.getVector("k2");
+ IntVector intVector = (IntVector) vector1;
+ intVector.setInitialCapacity(5);
+ intVector.allocateNew(4);
+ intVector.setIndexDefined(0);
+ intVector.setSafe(0, 0);
+ intVector.setIndexDefined(1);
+ intVector.setSafe(1, 255);
+ intVector.setIndexDefined(2);
+ intVector.setSafe(2, 65535);
+ intVector.setIndexDefined(3);
+ intVector.setSafe(3, 16777215);
+ intVector.setIndexDefined(4);
+ intVector.setWithPossibleTruncate(4, 4294967295L);
vector.setValueCount(5);
+ vector1.setValueCount(5);
arrowStreamWriter.writeBatch();
arrowStreamWriter.end();
@@ -1149,7 +1166,8 @@ public class TestRowBatch {
String schemaStr =
"{\"properties\":["
- +
"{\"type\":\"IPV4\",\"name\":\"k1\",\"comment\":\"\"}"
+ +
"{\"type\":\"IPV4\",\"name\":\"k1\",\"comment\":\"\"},"
+ +
"{\"type\":\"IPV4\",\"name\":\"k2\",\"comment\":\"\"}"
+ "], \"status\":200}";
Schema schema = RestService.parseSchema(schemaStr, logger);
@@ -1158,16 +1176,21 @@ public class TestRowBatch {
Assert.assertTrue(rowBatch.hasNext());
List<Object> actualRow0 = rowBatch.next();
assertEquals("0.0.0.0", actualRow0.get(0));
+ assertEquals("0.0.0.0", actualRow0.get(1));
List<Object> actualRow1 = rowBatch.next();
assertEquals("0.0.0.255", actualRow1.get(0));
+ assertEquals("0.0.0.255", actualRow1.get(1));
Assert.assertTrue(rowBatch.hasNext());
List<Object> actualRow2 = rowBatch.next();
assertEquals("0.0.255.255", actualRow2.get(0));
+ assertEquals("0.0.255.255", actualRow2.get(1));
Assert.assertTrue(rowBatch.hasNext());
List<Object> actualRow3 = rowBatch.next();
assertEquals("0.255.255.255", actualRow3.get(0));
+ assertEquals("0.255.255.255", actualRow3.get(1));
List<Object> actualRow4 = rowBatch.next();
assertEquals("255.255.255.255", actualRow4.get(0));
+ assertEquals("255.255.255.255", actualRow4.get(1));
Assert.assertFalse(rowBatch.hasNext());
thrown.expect(NoSuchElementException.class);
thrown.expectMessage(startsWith("Get row offset:"));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]