This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 06253be [feature]support read ipv4/ipv6 type (#365)
06253be is described below
commit 06253be8e0a218709799a316688c6dfb8b5375a3
Author: Petrichor <[email protected]>
AuthorDate: Mon Apr 15 14:46:55 2024 +0800
[feature]support read ipv4/ipv6 type (#365)
---
.../apache/doris/flink/catalog/DorisCatalog.java | 4 +-
.../doris/flink/catalog/DorisTypeMapper.java | 6 +
.../doris/flink/catalog/doris/DorisType.java | 3 +
.../apache/doris/flink/serialization/RowBatch.java | 41 +++-
.../java/org/apache/doris/flink/util/IPUtils.java | 201 +++++++++++++++++++
.../doris/flink/serialization/TestRowBatch.java | 220 ++++++++++++++++++++-
6 files changed, 464 insertions(+), 11 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java
index 96518d3..a09e8fc 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java
@@ -90,7 +90,7 @@ public class DorisCatalog extends AbstractCatalog {
private static final Logger LOG =
LoggerFactory.getLogger(DorisCatalog.class);
private DorisSystem dorisSystem;
- private DorisConnectionOptions connectionOptions;
+ private final DorisConnectionOptions connectionOptions;
private final Map<String, String> properties;
public DorisCatalog(
@@ -168,7 +168,7 @@ public class DorisCatalog extends AbstractCatalog {
throw new DatabaseNotExistException(getName(), name);
}
- if (!cascade && listTables(name).size() > 0) {
+ if (!cascade && !listTables(name).isEmpty()) {
throw new DatabaseNotEmptyException(getName(), name);
}
dorisSystem.dropDatabase(name);
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java
index e125a30..bbabd80 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java
@@ -53,6 +53,8 @@ import static
org.apache.doris.flink.catalog.doris.DorisType.DECIMAL_V3;
import static org.apache.doris.flink.catalog.doris.DorisType.DOUBLE;
import static org.apache.doris.flink.catalog.doris.DorisType.FLOAT;
import static org.apache.doris.flink.catalog.doris.DorisType.INT;
+import static org.apache.doris.flink.catalog.doris.DorisType.IPV4;
+import static org.apache.doris.flink.catalog.doris.DorisType.IPV6;
import static org.apache.doris.flink.catalog.doris.DorisType.JSON;
import static org.apache.doris.flink.catalog.doris.DorisType.JSONB;
import static org.apache.doris.flink.catalog.doris.DorisType.LARGEINT;
@@ -62,6 +64,7 @@ import static
org.apache.doris.flink.catalog.doris.DorisType.STRING;
import static org.apache.doris.flink.catalog.doris.DorisType.STRUCT;
import static org.apache.doris.flink.catalog.doris.DorisType.TINYINT;
import static org.apache.doris.flink.catalog.doris.DorisType.VARCHAR;
+import static org.apache.doris.flink.catalog.doris.DorisType.VARIANT;
public class DorisTypeMapper {
@@ -113,6 +116,9 @@ public class DorisTypeMapper {
case ARRAY:
case MAP:
case STRUCT:
+ case IPV4:
+ case IPV6:
+ case VARIANT:
return DataTypes.STRING();
case DATE:
case DATE_V2:
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisType.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisType.java
index 3779143..b2b3776 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisType.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisType.java
@@ -42,4 +42,7 @@ public class DorisType {
public static final String JSON = "JSON";
public static final String MAP = "MAP";
public static final String STRUCT = "STRUCT";
+ public static final String VARIANT = "VARIANT";
+ public static final String IPV4 = "IPV4";
+ public static final String IPV6 = "IPV6";
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
index c6dfa0e..9e87bde 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
@@ -32,6 +32,7 @@ import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.SmallIntVector;
import org.apache.arrow.vector.TimeStampMicroVector;
import org.apache.arrow.vector.TinyIntVector;
+import org.apache.arrow.vector.UInt4Vector;
import org.apache.arrow.vector.VarBinaryVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
@@ -44,6 +45,7 @@ import org.apache.arrow.vector.types.Types;
import org.apache.doris.flink.exception.DorisException;
import org.apache.doris.flink.exception.DorisRuntimeException;
import org.apache.doris.flink.rest.models.Schema;
+import org.apache.doris.flink.util.IPUtils;
import org.apache.doris.sdk.thrift.TScanBatchResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,12 +65,14 @@ import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
+import static org.apache.doris.flink.util.IPUtils.convertLongToIPv4Address;
+
/** row batch data container. */
public class RowBatch {
- private static Logger logger = LoggerFactory.getLogger(RowBatch.class);
+ private static final Logger logger =
LoggerFactory.getLogger(RowBatch.class);
public static class Row {
- private List<Object> cols;
+ private final List<Object> cols;
Row(int colCount) {
this.cols = new ArrayList<>(colCount);
@@ -84,10 +88,10 @@ public class RowBatch {
}
// offset for iterate the rowBatch
- private int offsetInRowBatch = 0;
+ private int offsetInRowBatch;
private int rowCountInOneBatch = 0;
private int readRowCount = 0;
- private List<Row> rowBatch = new ArrayList<>();
+ private final List<Row> rowBatch = new ArrayList<>();
private final ArrowStreamReader arrowStreamReader;
private VectorSchemaRoot root;
private List<FieldVector> fieldVectors;
@@ -149,10 +153,7 @@ public class RowBatch {
}
public boolean hasNext() {
- if (offsetInRowBatch < readRowCount) {
- return true;
- }
- return false;
+ return offsetInRowBatch < readRowCount;
}
private void addValueToRow(int rowIndex, Object obj) {
@@ -226,6 +227,17 @@ public class RowBatch {
fieldValue = intVector.isNull(rowIndex) ? null :
intVector.get(rowIndex);
addValueToRow(rowIndex, fieldValue);
break;
+ case "IPV4":
+ if (!minorType.equals(Types.MinorType.UINT4)) {
+ return false;
+ }
+ UInt4Vector ipv4Vector = (UInt4Vector) fieldVector;
+ fieldValue =
+ ipv4Vector.isNull(rowIndex)
+ ? null
+ :
convertLongToIPv4Address(ipv4Vector.getValueAsLong(rowIndex));
+ addValueToRow(rowIndex, fieldValue);
+ break;
case "BIGINT":
if (!minorType.equals(Types.MinorType.BIGINT)) {
return false;
@@ -390,6 +402,19 @@ public class RowBatch {
String stringValue = new String(varCharVector.get(rowIndex));
addValueToRow(rowIndex, stringValue);
break;
+ case "IPV6":
+ if (!minorType.equals(Types.MinorType.VARCHAR)) {
+ return false;
+ }
+ VarCharVector ipv6VarcharVector = (VarCharVector) fieldVector;
+ if (ipv6VarcharVector.isNull(rowIndex)) {
+ addValueToRow(rowIndex, null);
+ break;
+ }
+ String ipv6Str = new String(ipv6VarcharVector.get(rowIndex));
+ String ipv6Address = IPUtils.fromBigInteger(new
BigInteger(ipv6Str));
+ addValueToRow(rowIndex, ipv6Address);
+ break;
case "ARRAY":
if (!minorType.equals(Types.MinorType.LIST)) {
return false;
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/util/IPUtils.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/util/IPUtils.java
new file mode 100644
index 0000000..a834bbf
--- /dev/null
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/util/IPUtils.java
@@ -0,0 +1,201 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.util;
+
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.LongBuffer;
+import java.util.Arrays;
+
+public class IPUtils {
+ /**
+ * Create an IPv6 address from a (positive) {@link java.math.BigInteger}.
The magnitude of the
+ * {@link java.math.BigInteger} represents the IPv6 address value. Or in
other words, the {@link
+ * java.math.BigInteger} with value N defines the Nth possible IPv6
address.
+ *
+ * @param bigInteger {@link java.math.BigInteger} value
+ * @return IPv6 address
+ */
+ public static String fromBigInteger(BigInteger bigInteger) {
+ byte[] bytes = bigInteger.toByteArray();
+ if (bytes[0] == 0) {
+ bytes = Arrays.copyOfRange(bytes, 1, bytes.length); // Skip
leading zero byte
+ }
+ bytes = prefixWithZeroBytes(bytes);
+ long[] ipv6Bits = fromByteArray(bytes);
+ return toIPv6String(ipv6Bits[0], ipv6Bits[1]);
+ }
+
+ private static byte[] prefixWithZeroBytes(byte[] original) {
+ byte[] target = new byte[16];
+ System.arraycopy(original, 0, target, 16 - original.length,
original.length);
+ return target;
+ }
+
+ /**
+ * Create an IPv6 address from a byte array.
+ *
+ * @param bytes byte array with 16 bytes (interpreted unsigned)
+ * @return IPv6 address
+ */
+ public static long[] fromByteArray(byte[] bytes) {
+ if (bytes == null || bytes.length != 16) {
+ throw new IllegalArgumentException("Byte array must be exactly 16
bytes long");
+ }
+ ByteBuffer buf = ByteBuffer.wrap(bytes);
+ LongBuffer longBuffer = buf.asLongBuffer();
+ return new long[] {longBuffer.get(), longBuffer.get()};
+ }
+
+ private static String toShortHandNotationString(long highBits, long
lowBits) {
+ String[] strings = toArrayOfShortStrings(highBits, lowBits);
+ StringBuilder result = new StringBuilder();
+ int[] shortHandNotationPositionAndLength =
+ startAndLengthOfLongestRunOfZeroes(highBits, lowBits);
+ int shortHandNotationPosition = shortHandNotationPositionAndLength[0];
+ int shortHandNotationLength = shortHandNotationPositionAndLength[1];
+ boolean useShortHandNotation = shortHandNotationLength > 1;
+
+ for (int i = 0; i < strings.length; ++i) {
+ if (useShortHandNotation && i == shortHandNotationPosition) {
+ if (i == 0) {
+ result.append("::");
+ } else {
+ result.append(":");
+ }
+ } else if (i <= shortHandNotationPosition
+ || i >= shortHandNotationPosition +
shortHandNotationLength) {
+ result.append(strings[i]);
+ if (i < 7) {
+ result.append(":");
+ }
+ }
+ }
+
+ return result.toString().toLowerCase();
+ }
+
+ private static String[] toArrayOfShortStrings(long highBits, long lowBits)
{
+ short[] shorts = toShortArray(highBits, lowBits);
+ String[] strings = new String[shorts.length];
+
+ for (int i = 0; i < shorts.length; ++i) {
+ strings[i] = String.format("%x", shorts[i]);
+ }
+
+ return strings;
+ }
+
+ private static short[] toShortArray(long highBits, long lowBits) {
+ short[] shorts = new short[8];
+
+ for (int i = 0; i < 8; ++i) {
+ if (inHighRange(i)) {
+ shorts[i] = (short) ((int) (highBits << i * 16 >>> 48 &
0xFFFF));
+ } else {
+ shorts[i] = (short) ((int) (lowBits << i * 16 >>> 48 &
0xFFFF));
+ }
+ }
+
+ return shorts;
+ }
+
+ private static int[] startAndLengthOfLongestRunOfZeroes(long highBits,
long lowBits) {
+ int longestConsecutiveZeroes = 0;
+ int longestConsecutiveZeroesPos = -1;
+ short[] shorts = toShortArray(highBits, lowBits);
+
+ for (int pos = 0; pos < shorts.length; ++pos) {
+ int consecutiveZeroesAtCurrentPos = countConsecutiveZeroes(shorts,
pos);
+ if (consecutiveZeroesAtCurrentPos > longestConsecutiveZeroes) {
+ longestConsecutiveZeroes = consecutiveZeroesAtCurrentPos;
+ longestConsecutiveZeroesPos = pos;
+ }
+ }
+
+ return new int[] {longestConsecutiveZeroesPos,
longestConsecutiveZeroes};
+ }
+
+ private static boolean inHighRange(int shortNumber) {
+ return shortNumber >= 0 && shortNumber < 4;
+ }
+
+ private static int countConsecutiveZeroes(short[] shorts, int offset) {
+ int count = 0;
+
+ for (int i = offset; i < shorts.length && shorts[i] == 0; ++i) {
+ ++count;
+ }
+
+ return count;
+ }
+
+ public static String toIPv6String(long highBits, long lowBits) {
+
+ if (isIPv4Mapped(highBits, lowBits)) {
+ return toIPv4MappedAddressString(lowBits);
+ } else if (isIPv4Compatibility(highBits, lowBits)) {
+ return toIPv4CompatibilityAddressString(lowBits);
+ }
+
+ return toShortHandNotationString(highBits, lowBits);
+ }
+
+ public static String convertLongToIPv4Address(long lowBits) {
+ return String.format(
+ "%d.%d.%d.%d",
+ (lowBits >> 24) & 0xff,
+ (lowBits >> 16) & 0xff,
+ (lowBits >> 8) & 0xff,
+ lowBits & 0xff);
+ }
+
+ private static String toIPv4MappedAddressString(long lowBits) {
+ return "::ffff:" + convertLongToIPv4Address(lowBits);
+ }
+
+ private static String toIPv4CompatibilityAddressString(long lowBits) {
+ return "::" + convertLongToIPv4Address(lowBits);
+ }
+
+ /**
+ * Returns true if the address is an IPv4-mapped IPv6 address. In these
addresses, the first 80
+ * bits are zero, the next 16 bits are one, and the remaining 32 bits are
the IPv4 address.
+ *
+ * @return true if the address is an IPv4-mapped IPv6 addresses.
+ */
+ private static boolean isIPv4Mapped(long highBits, long lowBits) {
+ return highBits == 0
+ && (lowBits & 0xFFFF000000000000L) == 0
+ && (lowBits & 0x0000FFFF00000000L) == 0x0000FFFF00000000L;
+ }
+
+ /**
+ * Checks if the given IPv6 address is in IPv4 compatibility format. IPv4
compatibility format
+ * is characterized by having the high 96 bits of the IPv6 address set to
zero, while the low 32
+ * bits represent an IPv4 address. The criteria for determining IPv4
compatibility format are as
+ * follows: 1. The high 96 bits of the IPv6 address are all zeros. 2. The
low 32 bits are within
+ * the range from 0 to 4294967295 (0xFFFFFFFF). 3. The first 16 bits of
the low 32 bits are all
+ * ones (0xFFFF), indicating the special identifier for IPv4 compatibility
format.
+ *
+ * @return True if the given IPv6 address is in IPv4 compatibility format;
otherwise, false.
+ */
+ private static boolean isIPv4Compatibility(long highBits, long lowBits) {
+ return highBits == 0L && lowBits <= 0xFFFFFFFFL && (lowBits & 65536L)
== 65536L;
+ }
+}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
index 481004b..29e48b6 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
@@ -35,6 +35,7 @@ import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.SmallIntVector;
import org.apache.arrow.vector.TimeStampMicroVector;
import org.apache.arrow.vector.TinyIntVector;
+import org.apache.arrow.vector.UInt4Vector;
import org.apache.arrow.vector.VarBinaryVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
@@ -79,6 +80,7 @@ import java.util.List;
import java.util.NoSuchElementException;
import static org.hamcrest.core.StringStartsWith.startsWith;
+import static org.junit.Assert.assertEquals;
public class TestRowBatch {
private static Logger logger = LoggerFactory.getLogger(TestRowBatch.class);
@@ -939,7 +941,6 @@ public class TestRowBatch {
Schema schema = RestService.parseSchema(schemaStr, logger);
RowBatch rowBatch = new RowBatch(scanBatchResult, schema).readArrow();
-
Assert.assertTrue(rowBatch.hasNext());
List<Object> actualRow0 = rowBatch.next();
Assert.assertEquals("{\"id\":\"a\"}", actualRow0.get(0));
@@ -955,4 +956,221 @@ public class TestRowBatch {
thrown.expectMessage(startsWith("Get row offset:"));
rowBatch.next();
}
+
+ @Test
+ public void testIPV6() throws DorisException, IOException {
+
+ ImmutableList.Builder<Field> childrenBuilder = ImmutableList.builder();
+ childrenBuilder.add(new Field("k1", FieldType.nullable(new
ArrowType.Utf8()), null));
+
+ VectorSchemaRoot root =
+ VectorSchemaRoot.create(
+ new org.apache.arrow.vector.types.pojo.Schema(
+ childrenBuilder.build(), null),
+ new RootAllocator(Integer.MAX_VALUE));
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ ArrowStreamWriter arrowStreamWriter =
+ new ArrowStreamWriter(
+ root, new DictionaryProvider.MapDictionaryProvider(),
outputStream);
+
+ arrowStreamWriter.start();
+ root.setRowCount(13);
+
+ FieldVector vector = root.getVector("k1");
+ VarCharVector ipv6Vector = (VarCharVector) vector;
+ ipv6Vector.setInitialCapacity(13);
+ ipv6Vector.allocateNew();
+ ipv6Vector.setIndexDefined(0);
+ ipv6Vector.setValueLengthSafe(0, 1);
+ ipv6Vector.setSafe(0, "0".getBytes());
+
+ ipv6Vector.setIndexDefined(1);
+ ipv6Vector.setValueLengthSafe(0, 1);
+ ipv6Vector.setSafe(1, "1".getBytes());
+
+ ipv6Vector.setIndexDefined(2);
+ ipv6Vector.setSafe(2, "65535".getBytes());
+
+ ipv6Vector.setIndexDefined(3);
+ ipv6Vector.setSafe(3, "65536".getBytes());
+
+ ipv6Vector.setIndexDefined(4);
+ ipv6Vector.setSafe(4, "4294967295".getBytes());
+
+ ipv6Vector.setIndexDefined(5);
+ ipv6Vector.setSafe(5, "4294967296".getBytes());
+
+ ipv6Vector.setIndexDefined(6);
+ ipv6Vector.setSafe(6, "8589934591".getBytes());
+
+ ipv6Vector.setIndexDefined(7);
+ ipv6Vector.setSafe(7, "281470681743359".getBytes());
+
+ ipv6Vector.setIndexDefined(8);
+ ipv6Vector.setSafe(8, "281470681743360".getBytes());
+
+ ipv6Vector.setIndexDefined(9);
+ ipv6Vector.setSafe(9, "281474976710655".getBytes());
+
+ ipv6Vector.setIndexDefined(10);
+ ipv6Vector.setSafe(10, "281474976710656".getBytes());
+
+ ipv6Vector.setIndexDefined(11);
+ ipv6Vector.setSafe(11,
"340277174624079928635746639885392347137".getBytes());
+
+ ipv6Vector.setIndexDefined(12);
+ ipv6Vector.setSafe(12,
"340282366920938463463374607431768211455".getBytes());
+
+ vector.setValueCount(13);
+ arrowStreamWriter.writeBatch();
+
+ arrowStreamWriter.end();
+ arrowStreamWriter.close();
+
+ TStatus status = new TStatus();
+ status.setStatusCode(TStatusCode.OK);
+ TScanBatchResult scanBatchResult = new TScanBatchResult();
+ scanBatchResult.setStatus(status);
+ scanBatchResult.setEos(false);
+ scanBatchResult.setRows(outputStream.toByteArray());
+
+ String schemaStr =
+ "{\"properties\":["
+ +
"{\"type\":\"IPV6\",\"name\":\"k1\",\"comment\":\"\"}"
+ + "], \"status\":200}";
+
+ Schema schema = RestService.parseSchema(schemaStr, logger);
+
+ RowBatch rowBatch = new RowBatch(scanBatchResult, schema).readArrow();
+ Assert.assertTrue(rowBatch.hasNext());
+ List<Object> actualRow0 = rowBatch.next();
+ assertEquals("::", actualRow0.get(0));
+
+ Assert.assertTrue(rowBatch.hasNext());
+ List<Object> actualRow1 = rowBatch.next();
+ assertEquals("::1", actualRow1.get(0));
+
+ Assert.assertTrue(rowBatch.hasNext());
+ List<Object> actualRow2 = rowBatch.next();
+ assertEquals("::ffff", actualRow2.get(0));
+
+ Assert.assertTrue(rowBatch.hasNext());
+ List<Object> actualRow3 = rowBatch.next();
+ assertEquals("::0.1.0.0", actualRow3.get(0));
+
+ Assert.assertTrue(rowBatch.hasNext());
+ List<Object> actualRow4 = rowBatch.next();
+ assertEquals("::255.255.255.255", actualRow4.get(0));
+
+ Assert.assertTrue(rowBatch.hasNext());
+ List<Object> actualRow5 = rowBatch.next();
+ assertEquals("::1:0:0", actualRow5.get(0));
+
+ Assert.assertTrue(rowBatch.hasNext());
+ List<Object> actualRow6 = rowBatch.next();
+ assertEquals("::1:ffff:ffff", actualRow6.get(0));
+
+ Assert.assertTrue(rowBatch.hasNext());
+ List<Object> actualRow7 = rowBatch.next();
+ assertEquals("::fffe:ffff:ffff", actualRow7.get(0));
+
+ Assert.assertTrue(rowBatch.hasNext());
+ List<Object> actualRow8 = rowBatch.next();
+ assertEquals("::ffff:0.0.0.0", actualRow8.get(0));
+
+ Assert.assertTrue(rowBatch.hasNext());
+ List<Object> actualRow9 = rowBatch.next();
+ assertEquals("::ffff:255.255.255.255", actualRow9.get(0));
+
+ Assert.assertTrue(rowBatch.hasNext());
+ List<Object> actualRow10 = rowBatch.next();
+ assertEquals("::1:0:0:0", actualRow10.get(0));
+
+ Assert.assertTrue(rowBatch.hasNext());
+ List<Object> actualRow11 = rowBatch.next();
+ assertEquals("ffff::1:ffff:ffff:1", actualRow11.get(0));
+
+ Assert.assertTrue(rowBatch.hasNext());
+ List<Object> actualRow12 = rowBatch.next();
+ assertEquals("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff",
actualRow12.get(0));
+
+ Assert.assertFalse(rowBatch.hasNext());
+ thrown.expect(NoSuchElementException.class);
+ thrown.expectMessage(startsWith("Get row offset:"));
+ rowBatch.next();
+ }
+
+ @Test
+ public void testIPV4() throws DorisException, IOException {
+
+ ImmutableList.Builder<Field> childrenBuilder = ImmutableList.builder();
+ childrenBuilder.add(
+ new Field("k1", FieldType.nullable(new ArrowType.Int(32,
false)), null));
+
+ VectorSchemaRoot root =
+ VectorSchemaRoot.create(
+ new org.apache.arrow.vector.types.pojo.Schema(
+ childrenBuilder.build(), null),
+ new RootAllocator(Integer.MAX_VALUE));
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ ArrowStreamWriter arrowStreamWriter =
+ new ArrowStreamWriter(
+ root, new DictionaryProvider.MapDictionaryProvider(),
outputStream);
+
+ arrowStreamWriter.start();
+ root.setRowCount(5);
+
+ FieldVector vector = root.getVector("k1");
+ UInt4Vector uInt4Vector = (UInt4Vector) vector;
+ uInt4Vector.setInitialCapacity(5);
+ uInt4Vector.allocateNew(4);
+ uInt4Vector.setIndexDefined(0);
+ uInt4Vector.setSafe(0, 0);
+ uInt4Vector.setIndexDefined(1);
+ uInt4Vector.setSafe(1, 255);
+ uInt4Vector.setIndexDefined(2);
+ uInt4Vector.setSafe(2, 65535);
+ uInt4Vector.setIndexDefined(3);
+ uInt4Vector.setSafe(3, 16777215);
+ uInt4Vector.setIndexDefined(4);
+ uInt4Vector.setWithPossibleTruncate(4, 4294967295L);
+ vector.setValueCount(5);
+ arrowStreamWriter.writeBatch();
+
+ arrowStreamWriter.end();
+ arrowStreamWriter.close();
+
+ TStatus status = new TStatus();
+ status.setStatusCode(TStatusCode.OK);
+ TScanBatchResult scanBatchResult = new TScanBatchResult();
+ scanBatchResult.setStatus(status);
+ scanBatchResult.setEos(false);
+ scanBatchResult.setRows(outputStream.toByteArray());
+
+ String schemaStr =
+ "{\"properties\":["
+ +
"{\"type\":\"IPV4\",\"name\":\"k1\",\"comment\":\"\"}"
+ + "], \"status\":200}";
+
+ Schema schema = RestService.parseSchema(schemaStr, logger);
+
+ RowBatch rowBatch = new RowBatch(scanBatchResult, schema).readArrow();
+ Assert.assertTrue(rowBatch.hasNext());
+ List<Object> actualRow0 = rowBatch.next();
+ assertEquals("0.0.0.0", actualRow0.get(0));
+ List<Object> actualRow1 = rowBatch.next();
+ assertEquals("0.0.0.255", actualRow1.get(0));
+ Assert.assertTrue(rowBatch.hasNext());
+ List<Object> actualRow2 = rowBatch.next();
+ assertEquals("0.0.255.255", actualRow2.get(0));
+ Assert.assertTrue(rowBatch.hasNext());
+ List<Object> actualRow3 = rowBatch.next();
+ assertEquals("0.255.255.255", actualRow3.get(0));
+ List<Object> actualRow4 = rowBatch.next();
+ assertEquals("255.255.255.255", actualRow4.get(0));
+ Assert.assertFalse(rowBatch.hasNext());
+ thrown.expect(NoSuchElementException.class);
+ thrown.expectMessage(startsWith("Get row offset:"));
+ rowBatch.next();
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]