This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 2707f4f  [improve] support read ipv4 in Doris 2.1.0 (#369)
2707f4f is described below

commit 2707f4f601bdd116e415b6df896095b5a2f894dd
Author: Petrichor <[email protected]>
AuthorDate: Wed Apr 24 22:11:16 2024 +0800

    [improve] support read ipv4 in Doris 2.1.0 (#369)
---
 .../apache/doris/flink/serialization/RowBatch.java | 12 ++++++++--
 .../doris/flink/serialization/TestRowBatch.java    | 27 ++++++++++++++++++++--
 2 files changed, 35 insertions(+), 4 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
index 9e87bde..b4f35ab 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
@@ -20,6 +20,7 @@ package org.apache.doris.flink.serialization;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.BaseIntVector;
 import org.apache.arrow.vector.BigIntVector;
 import org.apache.arrow.vector.BitVector;
 import org.apache.arrow.vector.DateDayVector;
@@ -228,10 +229,17 @@ public class RowBatch {
                 addValueToRow(rowIndex, fieldValue);
                 break;
             case "IPV4":
-                if (!minorType.equals(Types.MinorType.UINT4)) {
+                if (!minorType.equals(Types.MinorType.UINT4)
+                        && !minorType.equals(Types.MinorType.INT)) {
                     return false;
                 }
-                UInt4Vector ipv4Vector = (UInt4Vector) fieldVector;
+                BaseIntVector ipv4Vector;
+                if (minorType.equals(Types.MinorType.INT)) {
+                    ipv4Vector = (IntVector) fieldVector;
+
+                } else {
+                    ipv4Vector = (UInt4Vector) fieldVector;
+                }
                 fieldValue =
                         ipv4Vector.isNull(rowIndex)
                                 ? null
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
index 29e48b6..d9a6422 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
@@ -1105,7 +1105,8 @@ public class TestRowBatch {
 
         ImmutableList.Builder<Field> childrenBuilder = ImmutableList.builder();
         childrenBuilder.add(
-                new Field("k1", FieldType.nullable(new ArrowType.Int(32, 
false)), null));
+                new Field("k1", FieldType.nullable(new ArrowType.Int(32, 
false)), null),
+                new Field("k2", FieldType.nullable(new ArrowType.Int(32, 
true)), null));
 
         VectorSchemaRoot root =
                 VectorSchemaRoot.create(
@@ -1134,7 +1135,23 @@ public class TestRowBatch {
         uInt4Vector.setSafe(3, 16777215);
         uInt4Vector.setIndexDefined(4);
         uInt4Vector.setWithPossibleTruncate(4, 4294967295L);
+
+        FieldVector vector1 = root.getVector("k2");
+        IntVector intVector = (IntVector) vector1;
+        intVector.setInitialCapacity(5);
+        intVector.allocateNew(4);
+        intVector.setIndexDefined(0);
+        intVector.setSafe(0, 0);
+        intVector.setIndexDefined(1);
+        intVector.setSafe(1, 255);
+        intVector.setIndexDefined(2);
+        intVector.setSafe(2, 65535);
+        intVector.setIndexDefined(3);
+        intVector.setSafe(3, 16777215);
+        intVector.setIndexDefined(4);
+        intVector.setWithPossibleTruncate(4, 4294967295L);
         vector.setValueCount(5);
+        vector1.setValueCount(5);
         arrowStreamWriter.writeBatch();
 
         arrowStreamWriter.end();
@@ -1149,7 +1166,8 @@ public class TestRowBatch {
 
         String schemaStr =
                 "{\"properties\":["
-                        + 
"{\"type\":\"IPV4\",\"name\":\"k1\",\"comment\":\"\"}"
+                        + 
"{\"type\":\"IPV4\",\"name\":\"k1\",\"comment\":\"\"},"
+                        + 
"{\"type\":\"IPV4\",\"name\":\"k2\",\"comment\":\"\"}"
                         + "], \"status\":200}";
 
         Schema schema = RestService.parseSchema(schemaStr, logger);
@@ -1158,16 +1176,21 @@ public class TestRowBatch {
         Assert.assertTrue(rowBatch.hasNext());
         List<Object> actualRow0 = rowBatch.next();
         assertEquals("0.0.0.0", actualRow0.get(0));
+        assertEquals("0.0.0.0", actualRow0.get(1));
         List<Object> actualRow1 = rowBatch.next();
         assertEquals("0.0.0.255", actualRow1.get(0));
+        assertEquals("0.0.0.255", actualRow1.get(1));
         Assert.assertTrue(rowBatch.hasNext());
         List<Object> actualRow2 = rowBatch.next();
         assertEquals("0.0.255.255", actualRow2.get(0));
+        assertEquals("0.0.255.255", actualRow2.get(1));
         Assert.assertTrue(rowBatch.hasNext());
         List<Object> actualRow3 = rowBatch.next();
         assertEquals("0.255.255.255", actualRow3.get(0));
+        assertEquals("0.255.255.255", actualRow3.get(1));
         List<Object> actualRow4 = rowBatch.next();
         assertEquals("255.255.255.255", actualRow4.get(0));
+        assertEquals("255.255.255.255", actualRow4.get(1));
         Assert.assertFalse(rowBatch.hasNext());
         thrown.expect(NoSuchElementException.class);
         thrown.expectMessage(startsWith("Get row offset:"));


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to