This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new a3cf8a6  [feature]support read ipv4/ipv6 data type (#199)
a3cf8a6 is described below

commit a3cf8a63bdc9356d6bcaace831be5fd9b41fcf20
Author: Petrichor <[email protected]>
AuthorDate: Tue May 7 22:08:03 2024 +0800

    [feature]support read ipv4/ipv6 data type (#199)
---
 .../apache/doris/spark/serialization/RowBatch.java |  35 ++-
 .../org/apache/doris/spark/sql/SchemaUtils.scala   |   2 +
 .../scala/org/apache/doris/spark/util/IPUtils.java | 204 ++++++++++++++++
 .../doris/spark/serialization/TestRowBatch.java    | 258 ++++++++++++++++++++-
 4 files changed, 489 insertions(+), 10 deletions(-)

diff --git 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java
 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java
index 8fd84d3..b38c007 100644
--- 
a/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java
+++ 
b/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java
@@ -19,6 +19,7 @@ package org.apache.doris.spark.serialization;
 
 import com.google.common.base.Preconditions;
 import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.BaseIntVector;
 import org.apache.arrow.vector.BigIntVector;
 import org.apache.arrow.vector.BitVector;
 import org.apache.arrow.vector.DateDayVector;
@@ -31,6 +32,7 @@ import org.apache.arrow.vector.IntVector;
 import org.apache.arrow.vector.SmallIntVector;
 import org.apache.arrow.vector.TimeStampMicroVector;
 import org.apache.arrow.vector.TinyIntVector;
+import org.apache.arrow.vector.UInt4Vector;
 import org.apache.arrow.vector.VarBinaryVector;
 import org.apache.arrow.vector.VarCharVector;
 import org.apache.arrow.vector.VectorSchemaRoot;
@@ -44,6 +46,7 @@ import org.apache.commons.lang3.ArrayUtils;
 import org.apache.doris.sdk.thrift.TScanBatchResult;
 import org.apache.doris.spark.exception.DorisException;
 import org.apache.doris.spark.rest.models.Schema;
+import org.apache.doris.spark.util.IPUtils;
 import org.apache.spark.sql.types.Decimal;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -69,6 +72,8 @@ import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Objects;
 
+import static org.apache.doris.spark.util.IPUtils.convertLongToIPv4Address;
+
 /**
  * row batch data container.
  */
@@ -246,6 +251,20 @@ public class RowBatch {
                             }
                         }
                         break;
+                    case "IPV4":
+                        
Preconditions.checkArgument(mt.equals(Types.MinorType.UINT4) || 
mt.equals(Types.MinorType.INT),
+                                typeMismatchMessage(currentType, mt));
+                        BaseIntVector ipv4Vector;
+                        if (mt.equals(Types.MinorType.INT)) {
+                            ipv4Vector = (IntVector) curFieldVector;
+                        } else {
+                            ipv4Vector = (UInt4Vector) curFieldVector;
+                        }
+                        for (int rowIndex = 0; rowIndex < rowCountInOneBatch; 
rowIndex++) {
+                            Object fieldValue = ipv4Vector.isNull(rowIndex) ? 
null : convertLongToIPv4Address(ipv4Vector.getValueAsLong(rowIndex));
+                            addValueToRow(rowIndex, fieldValue);
+                        }
+                        break;
                     case "FLOAT":
                         
Preconditions.checkArgument(mt.equals(Types.MinorType.FLOAT4),
                                 typeMismatchMessage(currentType, mt));
@@ -314,7 +333,7 @@ public class RowBatch {
                     case "DATE":
                     case "DATEV2":
                         
Preconditions.checkArgument(mt.equals(Types.MinorType.VARCHAR)
-                                        || mt.equals(Types.MinorType.DATEDAY), 
typeMismatchMessage(currentType, mt));
+                                || mt.equals(Types.MinorType.DATEDAY), 
typeMismatchMessage(currentType, mt));
                         if (mt.equals(Types.MinorType.VARCHAR)) {
                             VarCharVector date = (VarCharVector) 
curFieldVector;
                             for (int rowIndex = 0; rowIndex < 
rowCountInOneBatch; rowIndex++) {
@@ -392,6 +411,20 @@ public class RowBatch {
                             addValueToRow(rowIndex, value);
                         }
                         break;
+                    case "IPV6":
+                        
Preconditions.checkArgument(mt.equals(Types.MinorType.VARCHAR),
+                                typeMismatchMessage(currentType, mt));
+                        VarCharVector ipv6VarcharVector = (VarCharVector) 
curFieldVector;
+                        for (int rowIndex = 0; rowIndex < rowCountInOneBatch; 
rowIndex++) {
+                            if (ipv6VarcharVector.isNull(rowIndex)) {
+                                addValueToRow(rowIndex, null);
+                                break;
+                            }
+                            String ipv6Str = new 
String(ipv6VarcharVector.get(rowIndex));
+                            String ipv6Address = IPUtils.fromBigInteger(new 
BigInteger(ipv6Str));
+                            addValueToRow(rowIndex, ipv6Address);
+                        }
+                        break;
                     case "ARRAY":
                         
Preconditions.checkArgument(mt.equals(Types.MinorType.LIST),
                                 typeMismatchMessage(currentType, mt));
diff --git 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
index 914190a..76d231a 100644
--- 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
+++ 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala
@@ -130,6 +130,8 @@ private[spark] object SchemaUtils {
       case "MAP"             => MapType(DataTypes.StringType, 
DataTypes.StringType)
       case "STRUCT"          => DataTypes.StringType
       case "VARIANT"         => DataTypes.StringType
+      case "IPV4"            => DataTypes.StringType
+      case "IPV6"            => DataTypes.StringType
       case "HLL"             =>
         throw new DorisException("Unsupported type " + dorisType)
       case _                             =>
diff --git 
a/spark-doris-connector/src/main/scala/org/apache/doris/spark/util/IPUtils.java 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/util/IPUtils.java
new file mode 100644
index 0000000..b086d0f
--- /dev/null
+++ 
b/spark-doris-connector/src/main/scala/org/apache/doris/spark/util/IPUtils.java
@@ -0,0 +1,204 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.util;
+
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.LongBuffer;
+import java.util.Arrays;
+
+/**
+ *
+ */
+public class IPUtils {
+    /**
+     * Create an IPv6 address from a (positive) {@link java.math.BigInteger}. 
The magnitude of the
+     * {@link java.math.BigInteger} represents the IPv6 address value. Or in 
other words, the {@link
+     * java.math.BigInteger} with value N defines the Nth possible IPv6 
address.
+     *
+     * @param bigInteger {@link java.math.BigInteger} value
+     * @return IPv6 address
+     */
+    public static String fromBigInteger(BigInteger bigInteger) {
+        byte[] bytes = bigInteger.toByteArray();
+        if (bytes[0] == 0) {
+            bytes = Arrays.copyOfRange(bytes, 1, bytes.length); // Skip 
leading zero byte
+        }
+        bytes = prefixWithZeroBytes(bytes);
+        long[] ipv6Bits = fromByteArray(bytes);
+        return toIPv6String(ipv6Bits[0], ipv6Bits[1]);
+    }
+
+    private static byte[] prefixWithZeroBytes(byte[] original) {
+        byte[] target = new byte[16];
+        System.arraycopy(original, 0, target, 16 - original.length, 
original.length);
+        return target;
+    }
+
+    /**
+     * Create an IPv6 address from a byte array.
+     *
+     * @param bytes byte array with 16 bytes (interpreted unsigned)
+     * @return IPv6 address
+     */
+    public static long[] fromByteArray(byte[] bytes) {
+        if (bytes == null || bytes.length != 16) {
+            throw new IllegalArgumentException("Byte array must be exactly 16 
bytes long");
+        }
+        ByteBuffer buf = ByteBuffer.wrap(bytes);
+        LongBuffer longBuffer = buf.asLongBuffer();
+        return new long[] {longBuffer.get(), longBuffer.get()};
+    }
+
+    private static String toShortHandNotationString(long highBits, long 
lowBits) {
+        String[] strings = toArrayOfShortStrings(highBits, lowBits);
+        StringBuilder result = new StringBuilder();
+        int[] shortHandNotationPositionAndLength =
+                startAndLengthOfLongestRunOfZeroes(highBits, lowBits);
+        int shortHandNotationPosition = shortHandNotationPositionAndLength[0];
+        int shortHandNotationLength = shortHandNotationPositionAndLength[1];
+        boolean useShortHandNotation = shortHandNotationLength > 1;
+
+        for (int i = 0; i < strings.length; ++i) {
+            if (useShortHandNotation && i == shortHandNotationPosition) {
+                if (i == 0) {
+                    result.append("::");
+                } else {
+                    result.append(":");
+                }
+            } else if (i <= shortHandNotationPosition
+                    || i >= shortHandNotationPosition + 
shortHandNotationLength) {
+                result.append(strings[i]);
+                if (i < 7) {
+                    result.append(":");
+                }
+            }
+        }
+
+        return result.toString().toLowerCase();
+    }
+
+    private static String[] toArrayOfShortStrings(long highBits, long lowBits) 
{
+        short[] shorts = toShortArray(highBits, lowBits);
+        String[] strings = new String[shorts.length];
+
+        for (int i = 0; i < shorts.length; ++i) {
+            strings[i] = String.format("%x", shorts[i]);
+        }
+
+        return strings;
+    }
+
+    private static short[] toShortArray(long highBits, long lowBits) {
+        short[] shorts = new short[8];
+
+        for (int i = 0; i < 8; ++i) {
+            if (inHighRange(i)) {
+                shorts[i] = (short) ((int) (highBits << i * 16 >>> 48 & 
0xFFFF));
+            } else {
+                shorts[i] = (short) ((int) (lowBits << i * 16 >>> 48 & 
0xFFFF));
+            }
+        }
+
+        return shorts;
+    }
+
+    private static int[] startAndLengthOfLongestRunOfZeroes(long highBits, 
long lowBits) {
+        int longestConsecutiveZeroes = 0;
+        int longestConsecutiveZeroesPos = -1;
+        short[] shorts = toShortArray(highBits, lowBits);
+
+        for (int pos = 0; pos < shorts.length; ++pos) {
+            int consecutiveZeroesAtCurrentPos = countConsecutiveZeroes(shorts, 
pos);
+            if (consecutiveZeroesAtCurrentPos > longestConsecutiveZeroes) {
+                longestConsecutiveZeroes = consecutiveZeroesAtCurrentPos;
+                longestConsecutiveZeroesPos = pos;
+            }
+        }
+
+        return new int[] {longestConsecutiveZeroesPos, 
longestConsecutiveZeroes};
+    }
+
+    private static boolean inHighRange(int shortNumber) {
+        return shortNumber >= 0 && shortNumber < 4;
+    }
+
+    private static int countConsecutiveZeroes(short[] shorts, int offset) {
+        int count = 0;
+
+        for (int i = offset; i < shorts.length && shorts[i] == 0; ++i) {
+            ++count;
+        }
+
+        return count;
+    }
+
+    public static String toIPv6String(long highBits, long lowBits) {
+
+        if (isIPv4Mapped(highBits, lowBits)) {
+            return toIPv4MappedAddressString(lowBits);
+        } else if (isIPv4Compatibility(highBits, lowBits)) {
+            return toIPv4CompatibilityAddressString(lowBits);
+        }
+
+        return toShortHandNotationString(highBits, lowBits);
+    }
+
+    public static String convertLongToIPv4Address(long lowBits) {
+        return String.format(
+                "%d.%d.%d.%d",
+                (lowBits >> 24) & 0xff,
+                (lowBits >> 16) & 0xff,
+                (lowBits >> 8) & 0xff,
+                lowBits & 0xff);
+    }
+
+    private static String toIPv4MappedAddressString(long lowBits) {
+        return "::ffff:" + convertLongToIPv4Address(lowBits);
+    }
+
+    private static String toIPv4CompatibilityAddressString(long lowBits) {
+        return "::" + convertLongToIPv4Address(lowBits);
+    }
+
+    /**
+     * Returns true if the address is an IPv4-mapped IPv6 address. In these 
addresses, the first 80
+     * bits are zero, the next 16 bits are one, and the remaining 32 bits are 
the IPv4 address.
+     *
+     * @return true if the address is an IPv4-mapped IPv6 addresses.
+     */
+    private static boolean isIPv4Mapped(long highBits, long lowBits) {
+        return highBits == 0
+                && (lowBits & 0xFFFF000000000000L) == 0
+                && (lowBits & 0x0000FFFF00000000L) == 0x0000FFFF00000000L;
+    }
+
+    /**
+     * Checks if the given IPv6 address is in IPv4 compatibility format. IPv4 
compatibility format
+     * is characterized by having the high 96 bits of the IPv6 address set to 
zero, while the low 32
+     * bits represent an IPv4 address. The criteria for determining IPv4 
compatibility format are as
+     * follows: 1. The high 96 bits of the IPv6 address are all zeros. 2. The 
low 32 bits are within
+     * the range from 0 to 4294967295 (0xFFFFFFFF). 3. The first 16 bits of 
the low 32 bits are all
+     * ones (0xFFFF), indicating the special identifier for IPv4 compatibility 
format.
+     *
+     * @return True if the given IPv6 address is in IPv4 compatibility format; 
otherwise, false.
+     */
+    private static boolean isIPv4Compatibility(long highBits, long lowBits) {
+        return highBits == 0L && lowBits <= 0xFFFFFFFFL && (lowBits & 65536L) 
== 65536L;
+    }
+}
diff --git 
a/spark-doris-connector/src/test/java/org/apache/doris/spark/serialization/TestRowBatch.java
 
b/spark-doris-connector/src/test/java/org/apache/doris/spark/serialization/TestRowBatch.java
index 348895d..0c83050 100644
--- 
a/spark-doris-connector/src/test/java/org/apache/doris/spark/serialization/TestRowBatch.java
+++ 
b/spark-doris-connector/src/test/java/org/apache/doris/spark/serialization/TestRowBatch.java
@@ -19,6 +19,7 @@ package org.apache.doris.spark.serialization;
 
 import org.apache.arrow.vector.DateDayVector;
 import org.apache.arrow.vector.TimeStampMicroVector;
+import org.apache.arrow.vector.UInt4Vector;
 import org.apache.arrow.vector.types.DateUnit;
 import org.apache.arrow.vector.types.TimeUnit;
 import org.apache.doris.sdk.thrift.TScanBatchResult;
@@ -73,13 +74,12 @@ import java.nio.charset.StandardCharsets;
 import java.sql.Date;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
-import java.time.ZoneOffset;
 import java.util.Arrays;
 import java.util.List;
 import java.util.NoSuchElementException;
-import java.util.TimeZone;
 
 import static org.hamcrest.core.StringStartsWith.startsWith;
+import static org.junit.Assert.assertEquals;
 
 public class TestRowBatch {
     private final static Logger logger = 
LoggerFactory.getLogger(TestRowBatch.class);
@@ -481,7 +481,7 @@ public class TestRowBatch {
         root.setRowCount(1);
 
         FieldVector vector = root.getVector("k1");
-        VarCharVector dateVector = (VarCharVector)vector;
+        VarCharVector dateVector = (VarCharVector) vector;
         dateVector.setInitialCapacity(1);
         dateVector.allocateNew();
         dateVector.setIndexDefined(0);
@@ -491,7 +491,7 @@ public class TestRowBatch {
 
 
         vector = root.getVector("k2");
-        VarCharVector dateV2Vector = (VarCharVector)vector;
+        VarCharVector dateV2Vector = (VarCharVector) vector;
         dateV2Vector.setInitialCapacity(1);
         dateV2Vector.allocateNew();
         dateV2Vector.setIndexDefined(0);
@@ -500,7 +500,7 @@ public class TestRowBatch {
         vector.setValueCount(1);
 
         vector = root.getVector("k3");
-        DateDayVector dateNewVector = (DateDayVector)vector;
+        DateDayVector dateNewVector = (DateDayVector) vector;
         dateNewVector.setInitialCapacity(1);
         dateNewVector.allocateNew();
         dateNewVector.setIndexDefined(0);
@@ -563,7 +563,7 @@ public class TestRowBatch {
         root.setRowCount(1);
 
         FieldVector vector = root.getVector("k1");
-        VarCharVector lageIntVector = (VarCharVector)vector;
+        VarCharVector lageIntVector = (VarCharVector) vector;
         lageIntVector.setInitialCapacity(1);
         lageIntVector.allocateNew();
         lageIntVector.setIndexDefined(0);
@@ -573,7 +573,7 @@ public class TestRowBatch {
 
 
         vector = root.getVector("k2");
-        FixedSizeBinaryVector lageIntVector1 = (FixedSizeBinaryVector)vector;
+        FixedSizeBinaryVector lageIntVector1 = (FixedSizeBinaryVector) vector;
         lageIntVector1.setInitialCapacity(1);
         lageIntVector1.allocateNew();
         lageIntVector1.setIndexDefined(0);
@@ -777,7 +777,7 @@ public class TestRowBatch {
         root.setRowCount(3);
 
         FieldVector vector = root.getVector("k1");
-        VarCharVector datetimeVector = (VarCharVector)vector;
+        VarCharVector datetimeVector = (VarCharVector) vector;
         datetimeVector.setInitialCapacity(3);
         datetimeVector.allocateNew();
         datetimeVector.setIndexDefined(0);
@@ -869,7 +869,7 @@ public class TestRowBatch {
         root.setRowCount(3);
 
         FieldVector vector = root.getVector("k1");
-        VarCharVector datetimeVector = (VarCharVector)vector;
+        VarCharVector datetimeVector = (VarCharVector) vector;
         datetimeVector.setInitialCapacity(3);
         datetimeVector.allocateNew();
         datetimeVector.setIndexDefined(0);
@@ -921,4 +921,244 @@ public class TestRowBatch {
 
     }
 
+    @Test
+    public void testIPv4() throws DorisException, IOException {
+
+        ImmutableList.Builder<Field> childrenBuilder = ImmutableList.builder();
+        childrenBuilder.add(
+                new Field("k1", FieldType.nullable(new ArrowType.Int(32, 
false)), null),
+                new Field("k2", FieldType.nullable(new ArrowType.Int(32, 
true)), null));
+
+        VectorSchemaRoot root =
+                VectorSchemaRoot.create(
+                        new org.apache.arrow.vector.types.pojo.Schema(
+                                childrenBuilder.build(), null),
+                        new RootAllocator(Integer.MAX_VALUE));
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        ArrowStreamWriter arrowStreamWriter =
+                new ArrowStreamWriter(
+                        root, new DictionaryProvider.MapDictionaryProvider(), 
outputStream);
+
+        arrowStreamWriter.start();
+        root.setRowCount(5);
+
+        FieldVector vector = root.getVector("k1");
+        UInt4Vector uInt4Vector = (UInt4Vector) vector;
+        uInt4Vector.setInitialCapacity(5);
+        uInt4Vector.allocateNew();
+        uInt4Vector.setIndexDefined(0);
+        uInt4Vector.setSafe(0, 0);
+        uInt4Vector.setIndexDefined(1);
+        uInt4Vector.setSafe(1, 255);
+        uInt4Vector.setIndexDefined(2);
+        uInt4Vector.setSafe(2, 65535);
+        uInt4Vector.setIndexDefined(3);
+        uInt4Vector.setSafe(3, 16777215);
+        uInt4Vector.setIndexDefined(4);
+        uInt4Vector.setWithPossibleTruncate(4, 4294967295L);
+
+        FieldVector vector1 = root.getVector("k2");
+        IntVector int4Vector = (IntVector) vector1;
+        int4Vector.setInitialCapacity(5);
+        int4Vector.allocateNew();
+        int4Vector.setIndexDefined(0);
+        int4Vector.setSafe(0, 0);
+        int4Vector.setIndexDefined(1);
+        int4Vector.setSafe(1, 255);
+        int4Vector.setIndexDefined(2);
+        int4Vector.setSafe(2, 65535);
+        int4Vector.setIndexDefined(3);
+        int4Vector.setSafe(3, 16777215);
+        int4Vector.setIndexDefined(4);
+        int4Vector.setWithPossibleTruncate(4, 4294967295L);
+
+        vector.setValueCount(5);
+        vector1.setValueCount(5);
+        arrowStreamWriter.writeBatch();
+
+        arrowStreamWriter.end();
+        arrowStreamWriter.close();
+
+        TStatus status = new TStatus();
+        status.setStatusCode(TStatusCode.OK);
+        TScanBatchResult scanBatchResult = new TScanBatchResult();
+        scanBatchResult.setStatus(status);
+        scanBatchResult.setEos(false);
+        scanBatchResult.setRows(outputStream.toByteArray());
+
+        String schemaStr =
+                "{\"properties\":["
+                        + 
"{\"type\":\"IPV4\",\"name\":\"k1\",\"comment\":\"\"}, "
+                        + 
"{\"type\":\"IPV4\",\"name\":\"k2\",\"comment\":\"\"}"
+                        + "], \"status\":200}";
+
+        Schema schema = RestService.parseSchema(schemaStr, logger);
+
+        RowBatch rowBatch = new RowBatch(scanBatchResult, schema);
+        Assert.assertTrue(rowBatch.hasNext());
+        List<Object> actualRow0 = rowBatch.next();
+        assertEquals("0.0.0.0", actualRow0.get(0));
+        assertEquals("0.0.0.0", actualRow0.get(1));
+        List<Object> actualRow1 = rowBatch.next();
+        assertEquals("0.0.0.255", actualRow1.get(0));
+        assertEquals("0.0.0.255", actualRow1.get(1));
+        Assert.assertTrue(rowBatch.hasNext());
+        List<Object> actualRow2 = rowBatch.next();
+        assertEquals("0.0.255.255", actualRow2.get(0));
+        assertEquals("0.0.255.255", actualRow2.get(1));
+        Assert.assertTrue(rowBatch.hasNext());
+        List<Object> actualRow3 = rowBatch.next();
+        assertEquals("0.255.255.255", actualRow3.get(0));
+        assertEquals("0.255.255.255", actualRow3.get(1));
+        List<Object> actualRow4 = rowBatch.next();
+        assertEquals("255.255.255.255", actualRow4.get(0));
+        assertEquals("255.255.255.255", actualRow4.get(1));
+        Assert.assertFalse(rowBatch.hasNext());
+        thrown.expect(NoSuchElementException.class);
+        thrown.expectMessage(startsWith("Get row offset:"));
+        rowBatch.next();
+    }
+
+    @Test
+    public void testIPv6() throws DorisException, IOException {
+
+        ImmutableList.Builder<Field> childrenBuilder = ImmutableList.builder();
+        childrenBuilder.add(new Field("k1", FieldType.nullable(new 
ArrowType.Utf8()), null));
+
+        VectorSchemaRoot root =
+                VectorSchemaRoot.create(
+                        new org.apache.arrow.vector.types.pojo.Schema(
+                                childrenBuilder.build(), null),
+                        new RootAllocator(Integer.MAX_VALUE));
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        ArrowStreamWriter arrowStreamWriter =
+                new ArrowStreamWriter(
+                        root, new DictionaryProvider.MapDictionaryProvider(), 
outputStream);
+
+        arrowStreamWriter.start();
+        root.setRowCount(13);
+
+        FieldVector vector = root.getVector("k1");
+        VarCharVector ipv6Vector = (VarCharVector) vector;
+        ipv6Vector.setInitialCapacity(13);
+        ipv6Vector.allocateNew();
+        ipv6Vector.setIndexDefined(0);
+        ipv6Vector.setValueLengthSafe(0, 1);
+        ipv6Vector.setSafe(0, "0".getBytes());
+
+        ipv6Vector.setIndexDefined(1);
+        ipv6Vector.setValueLengthSafe(0, 1);
+        ipv6Vector.setSafe(1, "1".getBytes());
+
+        ipv6Vector.setIndexDefined(2);
+        ipv6Vector.setSafe(2, "65535".getBytes());
+
+        ipv6Vector.setIndexDefined(3);
+        ipv6Vector.setSafe(3, "65536".getBytes());
+
+        ipv6Vector.setIndexDefined(4);
+        ipv6Vector.setSafe(4, "4294967295".getBytes());
+
+        ipv6Vector.setIndexDefined(5);
+        ipv6Vector.setSafe(5, "4294967296".getBytes());
+
+        ipv6Vector.setIndexDefined(6);
+        ipv6Vector.setSafe(6, "8589934591".getBytes());
+
+        ipv6Vector.setIndexDefined(7);
+        ipv6Vector.setSafe(7, "281470681743359".getBytes());
+
+        ipv6Vector.setIndexDefined(8);
+        ipv6Vector.setSafe(8, "281470681743360".getBytes());
+
+        ipv6Vector.setIndexDefined(9);
+        ipv6Vector.setSafe(9, "281474976710655".getBytes());
+
+        ipv6Vector.setIndexDefined(10);
+        ipv6Vector.setSafe(10, "281474976710656".getBytes());
+
+        ipv6Vector.setIndexDefined(11);
+        ipv6Vector.setSafe(11, 
"340277174624079928635746639885392347137".getBytes());
+
+        ipv6Vector.setIndexDefined(12);
+        ipv6Vector.setSafe(12, 
"340282366920938463463374607431768211455".getBytes());
+
+        vector.setValueCount(13);
+        arrowStreamWriter.writeBatch();
+
+        arrowStreamWriter.end();
+        arrowStreamWriter.close();
+
+        TStatus status = new TStatus();
+        status.setStatusCode(TStatusCode.OK);
+        TScanBatchResult scanBatchResult = new TScanBatchResult();
+        scanBatchResult.setStatus(status);
+        scanBatchResult.setEos(false);
+        scanBatchResult.setRows(outputStream.toByteArray());
+
+        String schemaStr =
+                "{\"properties\":["
+                        + 
"{\"type\":\"IPV6\",\"name\":\"k1\",\"comment\":\"\"}"
+                        + "], \"status\":200}";
+
+        Schema schema = RestService.parseSchema(schemaStr, logger);
+
+        RowBatch rowBatch = new RowBatch(scanBatchResult, schema);
+        Assert.assertTrue(rowBatch.hasNext());
+        List<Object> actualRow0 = rowBatch.next();
+        assertEquals("::", actualRow0.get(0));
+
+        Assert.assertTrue(rowBatch.hasNext());
+        List<Object> actualRow1 = rowBatch.next();
+        assertEquals("::1", actualRow1.get(0));
+
+        Assert.assertTrue(rowBatch.hasNext());
+        List<Object> actualRow2 = rowBatch.next();
+        assertEquals("::ffff", actualRow2.get(0));
+
+        Assert.assertTrue(rowBatch.hasNext());
+        List<Object> actualRow3 = rowBatch.next();
+        assertEquals("::0.1.0.0", actualRow3.get(0));
+
+        Assert.assertTrue(rowBatch.hasNext());
+        List<Object> actualRow4 = rowBatch.next();
+        assertEquals("::255.255.255.255", actualRow4.get(0));
+
+        Assert.assertTrue(rowBatch.hasNext());
+        List<Object> actualRow5 = rowBatch.next();
+        assertEquals("::1:0:0", actualRow5.get(0));
+
+        Assert.assertTrue(rowBatch.hasNext());
+        List<Object> actualRow6 = rowBatch.next();
+        assertEquals("::1:ffff:ffff", actualRow6.get(0));
+
+        Assert.assertTrue(rowBatch.hasNext());
+        List<Object> actualRow7 = rowBatch.next();
+        assertEquals("::fffe:ffff:ffff", actualRow7.get(0));
+
+        Assert.assertTrue(rowBatch.hasNext());
+        List<Object> actualRow8 = rowBatch.next();
+        assertEquals("::ffff:0.0.0.0", actualRow8.get(0));
+
+        Assert.assertTrue(rowBatch.hasNext());
+        List<Object> actualRow9 = rowBatch.next();
+        assertEquals("::ffff:255.255.255.255", actualRow9.get(0));
+
+        Assert.assertTrue(rowBatch.hasNext());
+        List<Object> actualRow10 = rowBatch.next();
+        assertEquals("::1:0:0:0", actualRow10.get(0));
+
+        Assert.assertTrue(rowBatch.hasNext());
+        List<Object> actualRow11 = rowBatch.next();
+        assertEquals("ffff::1:ffff:ffff:1", actualRow11.get(0));
+
+        Assert.assertTrue(rowBatch.hasNext());
+        List<Object> actualRow12 = rowBatch.next();
+        assertEquals("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff", 
actualRow12.get(0));
+
+        Assert.assertFalse(rowBatch.hasNext());
+        thrown.expect(NoSuchElementException.class);
+        thrown.expectMessage(startsWith("Get row offset:"));
+        rowBatch.next();
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to