This is an automated email from the ASF dual-hosted git repository.
achennaka pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 857c079fa KUDU-1261 [Java] Implement serdes of Array Type column
857c079fa is described below
commit 857c079fae003e32699a0ade1971804680b94b9d
Author: Abhishek Chennaka <[email protected]>
AuthorDate: Wed Oct 1 14:10:47 2025 -0700
KUDU-1261 [Java] Implement serdes of Array Type column
Added a new directory kudu-flatbuffers, similar to the kudu-proto structure,
to hold all FlatBuffers-related build configuration. Generated Java classes
from the existing FlatBuffers schema (array1d.fbs) to represent 1-D scalar
arrays.
Implemented Array1dSerdes for serialization and deserialization of all
scalar
types defined in array1d.fbs. Added corresponding test cases for each type
to
validate the serdes functionality.
The generated flatbuffer classes are shaded into the Kudu client jar.
Updated verify_jars.pl script accordingly.
checkstyle and spotbugs are configured not to run on the auto-generated
classes
in kudu-flatbuffers.
Change-Id: Ie1ad9f65fe94c8662ed0e0834ce849e078fc72d2
Reviewed-on: http://gerrit.cloudera.org:8080/23481
Reviewed-by: Alexey Serbin <[email protected]>
Tested-by: Abhishek Chennaka <[email protected]>
---
build-support/verify_jars.pl | 2 +-
java/config/spotbugs/excludeFilter.xml | 14 +
java/gradle/dependencies.gradle | 2 +
java/gradle/quality.gradle | 8 +-
java/gradle/shadow.gradle | 1 +
java/kudu-client/build.gradle | 2 +
.../java/org/apache/kudu/client/Array1dSerdes.java | 479 +++++++++++++++++++++
.../org/apache/kudu/client/TestArraySerdes.java | 309 +++++++++++++
java/kudu-flatbuffers/build.gradle | 87 ++++
java/settings.gradle | 1 +
10 files changed, 902 insertions(+), 3 deletions(-)
diff --git a/build-support/verify_jars.pl b/build-support/verify_jars.pl
index e44a224f4..adb95f130 100755
--- a/build-support/verify_jars.pl
+++ b/build-support/verify_jars.pl
@@ -38,7 +38,7 @@ my $pat_allow_non_java =
# Allowed filenames of shaded dependencies in JARs.
my $pat_allow_kudu_shaded =
qr{^org/apache/kudu/shaded/
-
(?:com/google/(?:common|gson|gradle/osdetector|protobuf|thirdparty/publicsuffix)|
+
(?:com/google/(?:common|flatbuffers|gson|gradle/osdetector|protobuf|thirdparty/publicsuffix)|
com/sangupta/murmur|
kr/motd/maven|
org/apache/(?:commons|http)|
diff --git a/java/config/spotbugs/excludeFilter.xml
b/java/config/spotbugs/excludeFilter.xml
index 91dde282c..e713eaec2 100644
--- a/java/config/spotbugs/excludeFilter.xml
+++ b/java/config/spotbugs/excludeFilter.xml
@@ -30,6 +30,13 @@
<Class name="~org\.apache\.kudu\.rpc\.RpcHeader.*"/>
</Or>
</Match>
+ <!-- Ignore classes generated by FlatBuffers. -->
+ <Match>
+ <Or>
+ <!-- All classes under the serdes package -->
+ <Class name="~org\.apache\.kudu\.serdes\..*"/>
+ </Or>
+ </Match>
<Match>
<!-- Spotbugs works better with Java than with Scala. We suppress some
categories of
bug reports when using Scala, since spotbugs generates huge numbers of
false positives
@@ -181,6 +188,13 @@
</Or>
<Bug pattern="NM_CLASS_NOT_EXCEPTION" />
</Match>
+ <Match>
+ <!-- Intentional: test verifies that serializeInt32(null, null) encodes
+ a null array as an empty array. -->
+ <Class name="org.apache.kudu.client.TestArraySerdes"/>
+ <Method name="testInt32RoundTripNullArray"/>
+ <Bug pattern="NP_LOAD_OF_KNOWN_NULL_VALUE"/>
+ </Match>
<Match>
<!-- Though returning null is a bad practice, changing this now breaks
expectations. -->
<Or>
diff --git a/java/gradle/dependencies.gradle b/java/gradle/dependencies.gradle
index d498ae727..6ae8b6b1c 100755
--- a/java/gradle/dependencies.gradle
+++ b/java/gradle/dependencies.gradle
@@ -33,6 +33,7 @@ versions += [
commonsIo : "2.15.0",
errorProne : "2.3.3",
errorProneJavac : "9+181-r4173-1",
+ flatbuffers : "25.2.10",
flinkConnectorKudu : "2.0.0-1.19",
flink : "1.19.2",
gradle : "7.6.4",
@@ -94,6 +95,7 @@ libs += [
commonsIo : "commons-io:commons-io:$versions.commonsIo",
errorProne :
"com.google.errorprone:error_prone_core:$versions.errorProne",
errorProneJavac :
"com.google.errorprone:javac:$versions.errorProneJavac",
+ flatbuffersJava :
"com.google.flatbuffers:flatbuffers-java:$versions.flatbuffers",
flinkClients : "org.apache.flink:flink-clients:$versions.flink",
flinkConnectorBase :
"org.apache.flink:flink-connector-base:$versions.flink",
flinkCore : "org.apache.flink:flink-core:$versions.flink",
diff --git a/java/gradle/quality.gradle b/java/gradle/quality.gradle
index 72ba51757..ffc10f3b9 100644
--- a/java/gradle/quality.gradle
+++ b/java/gradle/quality.gradle
@@ -31,6 +31,8 @@ if (propertyExists("ignoreCheckFailures")) {
// code style verification tasks since the code is assumed to be already
// passed those in various pre-commit builds. For that, define the
// 'skipCodeStyleChecks' property.
+// We also do not need to run it on kudu-flatbuffers module as it only contains
+// auto-generated code.
if (!propertyExists("skipCodeStyleChecks")) {
// Ensures Java code follows the defined coding style.
apply plugin: "checkstyle"
@@ -112,9 +114,11 @@ if(!JavaVersion.current().isJava11Compatible()) {
}
}
-// Don't enable code coverage for kudu-proto given it is exclusively generated
code.
+// Don't enable code coverage for kudu-proto and kudu-flatbuffers given it is
exclusively
+// generated code.
// Don't enable code coverage for kudu-jepsen given it is exclusively test
code.
-if (project.name != "kudu-proto" && project.name != "kudu-jepsen") {
+if (project.name != "kudu-proto" && project.name != "kudu-jepsen" &&
+ project.name != "kudu-flatbuffers") {
apply plugin: "jacoco" // Provides code coverage metrics for Java code.
jacoco {
toolVersion = versions.jacoco
diff --git a/java/gradle/shadow.gradle b/java/gradle/shadow.gradle
index 0d8f473dd..959ae4db5 100644
--- a/java/gradle/shadow.gradle
+++ b/java/gradle/shadow.gradle
@@ -63,6 +63,7 @@ shadowJar {
// https://docs.gradle.org/current/userguide/compatibility.html#java_runtime
exclude "META-INF/versions/2?/"
relocate "com.google.common", "org.apache.kudu.shaded.com.google.common"
+ relocate "com.google.flatbuffers",
"org.apache.kudu.shaded.com.google.flatbuffers"
relocate "com.google.gradle.osdetector",
"org.apache.kudu.shaded.com.google.gradle.osdetector"
relocate "com.google.gson", "org.apache.kudu.shaded.com.google.gson"
relocate "com.google.protobuf", "org.apache.kudu.shaded.com.google.protobuf"
diff --git a/java/kudu-client/build.gradle b/java/kudu-client/build.gradle
index bc0d6d96a..085b0f517 100644
--- a/java/kudu-client/build.gradle
+++ b/java/kudu-client/build.gradle
@@ -23,7 +23,9 @@ apply from: "$rootDir/gradle/shadow.gradle"
apply from: "$rootDir/gradle/benchmarks.gradle"
dependencies {
+ implementation project(path: ":kudu-flatbuffers")
implementation project(path: ":kudu-proto")
+ compileOnly libs.flatbuffersJava
compileOnly libs.protobufJava
// Not shaded in the client JAR because it's part of the public API.
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/Array1dSerdes.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/Array1dSerdes.java
new file mode 100644
index 000000000..e9c2f31c6
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Array1dSerdes.java
@@ -0,0 +1,479 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.client;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import java.util.Objects;
+
+import com.google.flatbuffers.FlatBufferBuilder;
+import com.google.flatbuffers.Table;
+
+// FlatBuffers generated classes
+import org.apache.kudu.serdes.BinaryArray;
+import org.apache.kudu.serdes.Content;
+import org.apache.kudu.serdes.DoubleArray;
+import org.apache.kudu.serdes.FloatArray;
+import org.apache.kudu.serdes.Int16Array;
+import org.apache.kudu.serdes.Int32Array;
+import org.apache.kudu.serdes.Int64Array;
+import org.apache.kudu.serdes.Int8Array;
+import org.apache.kudu.serdes.ScalarArray;
+import org.apache.kudu.serdes.StringArray;
+import org.apache.kudu.serdes.UInt16Array;
+import org.apache.kudu.serdes.UInt32Array;
+import org.apache.kudu.serdes.UInt64Array;
+import org.apache.kudu.serdes.UInt8Array;
+
+public class Array1dSerdes {
+
+ // ---------------- Result container ----------------
+ public static class ArrayResult {
+ private final Object values;
+ private final boolean[] validity;
+
+ public ArrayResult(Object values, boolean[] validity) {
+ this.values = values;
+ this.validity = validity;
+ }
+
+ public Object getValues() {
+ return values;
+ }
+
+ public boolean[] getValidity() {
+ return validity;
+ }
+ }
+
+ // ---------------- Helper interfaces ----------------
+ @FunctionalInterface interface CreateVec<V> { int apply(FlatBufferBuilder b,
V values); }
+
+ @FunctionalInterface interface Start { void apply(FlatBufferBuilder b); }
+
+ @FunctionalInterface interface AddValues { void apply(FlatBufferBuilder b,
int vecOff); }
+
+ @FunctionalInterface interface End { int apply(FlatBufferBuilder b); }
+
+ @FunctionalInterface interface GetUnion<A> { A apply(Content c); }
+
+ @FunctionalInterface interface ArrLen<A> { int apply(A arr); }
+
+ @FunctionalInterface interface Copier<A> { void copy(A arr, int i, Object
outArray); }
+
+ @FunctionalInterface interface NewArray { Object apply(int m); }
+
+ // ---------------- Common helpers ----------------
+ private static int validityOffset(FlatBufferBuilder b, boolean[] validity) {
+ if (validity == null || validity.length == 0) {
+ return 0;
+ }
+ return Content.createValidityVector(b, validity);
+ }
+
+ private static int finishContent(FlatBufferBuilder b, int discriminator,
+ int arrOffset, int validityOffset) {
+ Content.startContent(b);
+ Content.addDataType(b, (byte) discriminator);
+ Content.addData(b, arrOffset);
+ if (validityOffset != 0) {
+ Content.addValidity(b, validityOffset);
+ }
+ return Content.endContent(b);
+ }
+
+ private static Content readContent(byte[] buf) {
+ return
Content.getRootAsContent(ByteBuffer.wrap(buf).order(ByteOrder.LITTLE_ENDIAN));
+ }
+
+ private static boolean[] buildValidity(Content c, int m) {
+ final int n = c.validityLength();
+ boolean[] validity = new boolean[m];
+
+ if (n == 0) {
+ // No validity buffer present. By convention, all elements are non-null.
+ Arrays.fill(validity, true);
+ return validity;
+ }
+ if (n != m) {
+ throw new IllegalArgumentException(
+ String.format("Invalid validity length %d (expected %d)", n, m));
+ }
+ for (int i = 0; i < m; i++) {
+ validity[i] = c.validity(i);
+ }
+ return validity;
+ }
+
+ // ---------------- Int8 ----------------
+ public static byte[] serializeInt8(byte[] values, boolean[] validity) {
+ return serializePrimitive(
+ values, validity, ScalarArray.Int8Array,
+ Int8Array::createValuesVector,
+ Int8Array::startInt8Array,
+ Int8Array::addValues,
+ Int8Array::endInt8Array
+ );
+ }
+
+ public static ArrayResult parseInt8(byte[] buf) {
+ return parsePrimitive(
+ buf, ScalarArray.Int8Array,
+ c -> (Int8Array) c.data(new Int8Array()),
+ Int8Array::valuesLength,
+ byte[]::new,
+ (a, i, out) -> ((byte[]) out)[i] = a.values(i)
+ );
+ }
+
+ // ---------------- UInt8 ----------------
+ public static byte[] serializeUInt8(byte[] values, boolean[] validity) {
+ byte[] narrowed = null;
+ if (values != null) {
+ narrowed = new byte[values.length];
+ for (int i = 0; i < values.length; i++) {
+ narrowed[i] = (byte)(values[i] & 0xFF);
+ }
+ }
+ return serializePrimitive(
+ narrowed, validity, ScalarArray.UInt8Array,
+ UInt8Array::createValuesVector,
+ UInt8Array::startUInt8Array,
+ UInt8Array::addValues,
+ UInt8Array::endUInt8Array
+ );
+ }
+
+ public static ArrayResult parseUInt8(byte[] buf) {
+ return parsePrimitive(
+ buf, ScalarArray.UInt8Array,
+ c -> (UInt8Array) c.data(new UInt8Array()),
+ UInt8Array::valuesLength,
+ int[]::new,
+ (a, i, out) -> ((int[]) out)[i] = a.values(i) & 0xFF
+ );
+ }
+
+ // ---------------- Int16 ----------------
+ public static byte[] serializeInt16(short[] values, boolean[] validity) {
+ return serializePrimitive(
+ values, validity, ScalarArray.Int16Array,
+ Int16Array::createValuesVector,
+ Int16Array::startInt16Array,
+ Int16Array::addValues,
+ Int16Array::endInt16Array
+ );
+ }
+
+ public static ArrayResult parseInt16(byte[] buf) {
+ return parsePrimitive(
+ buf, ScalarArray.Int16Array,
+ c -> (Int16Array) c.data(new Int16Array()),
+ Int16Array::valuesLength,
+ short[]::new,
+ (a, i, out) -> ((short[]) out)[i] = a.values(i)
+ );
+ }
+
+ // ---------------- UInt16 ----------------
+ public static byte[] serializeUInt16(short[] values, boolean[] validity) {
+ int[] widened = null;
+ if (values != null) {
+ widened = new int[values.length];
+ for (int i = 0; i < values.length; i++) {
+ widened[i] = values[i] & 0xFFFF;
+ }
+ }
+ return serializePrimitive(
+ widened, validity, ScalarArray.UInt16Array,
+ UInt16Array::createValuesVector,
+ UInt16Array::startUInt16Array,
+ UInt16Array::addValues,
+ UInt16Array::endUInt16Array
+ );
+ }
+
+ public static ArrayResult parseUInt16(byte[] buf) {
+ return parsePrimitive(
+ buf, ScalarArray.UInt16Array,
+ c -> (UInt16Array) c.data(new UInt16Array()),
+ UInt16Array::valuesLength,
+ int[]::new,
+ (a, i, out) -> ((int[]) out)[i] = a.values(i) & 0xFFFF
+ );
+ }
+
+ // ---------------- Int32 ----------------
+ public static byte[] serializeInt32(int[] values, boolean[] validity) {
+ return serializePrimitive(
+ values, validity, ScalarArray.Int32Array,
+ Int32Array::createValuesVector,
+ Int32Array::startInt32Array,
+ Int32Array::addValues,
+ Int32Array::endInt32Array
+ );
+ }
+
+ public static ArrayResult parseInt32(byte[] buf) {
+ return parsePrimitive(
+ buf, ScalarArray.Int32Array,
+ c -> (Int32Array) c.data(new Int32Array()),
+ Int32Array::valuesLength,
+ int[]::new,
+ (a, i, out) -> ((int[]) out)[i] = a.values(i)
+ );
+ }
+
+ // ---------------- UInt32 ----------------
+ public static byte[] serializeUInt32(int[] values, boolean[] validity) {
+ long[] widened = null;
+ if (values != null) {
+ widened = new long[values.length];
+ for (int i = 0; i < values.length; i++) {
+ widened[i] = values[i] & 0xFFFFFFFFL;
+ }
+ }
+ return serializePrimitive(
+ widened, validity, ScalarArray.UInt32Array,
+ UInt32Array::createValuesVector,
+ UInt32Array::startUInt32Array,
+ UInt32Array::addValues,
+ UInt32Array::endUInt32Array
+ );
+ }
+
+ public static ArrayResult parseUInt32(byte[] buf) {
+ return parsePrimitive(
+ buf, ScalarArray.UInt32Array,
+ c -> (UInt32Array) c.data(new UInt32Array()),
+ UInt32Array::valuesLength,
+ long[]::new,
+ (a, i, out) -> ((long[]) out)[i] = a.values(i) & 0xFFFFFFFFL
+ );
+ }
+
+ // ---------------- Int64 ----------------
+ public static byte[] serializeInt64(long[] values, boolean[] validity) {
+ return serializePrimitive(
+ values, validity, ScalarArray.Int64Array,
+ Int64Array::createValuesVector,
+ Int64Array::startInt64Array,
+ Int64Array::addValues,
+ Int64Array::endInt64Array
+ );
+ }
+
+ public static ArrayResult parseInt64(byte[] buf) {
+ return parsePrimitive(
+ buf, ScalarArray.Int64Array,
+ c -> (Int64Array) c.data(new Int64Array()),
+ Int64Array::valuesLength,
+ long[]::new,
+ (a, i, out) -> ((long[]) out)[i] = a.values(i)
+ );
+ }
+
+ // ---------------- UInt64 ----------------
+ public static byte[] serializeUInt64(long[] values, boolean[] validity) {
+ return serializePrimitive(
+ values, validity, ScalarArray.UInt64Array,
+ UInt64Array::createValuesVector,
+ UInt64Array::startUInt64Array,
+ UInt64Array::addValues,
+ UInt64Array::endUInt64Array
+ );
+ }
+
+ public static ArrayResult parseUInt64(byte[] buf) {
+ return parsePrimitive(
+ buf, ScalarArray.UInt64Array,
+ c -> (UInt64Array) c.data(new UInt64Array()),
+ UInt64Array::valuesLength,
+ long[]::new,
+ (a, i, out) -> ((long[]) out)[i] = a.values(i)
+ );
+ }
+
+ // ---------------- Float ----------------
+ public static byte[] serializeFloat(float[] values, boolean[] validity) {
+ return serializePrimitive(
+ values, validity, ScalarArray.FloatArray,
+ FloatArray::createValuesVector,
+ FloatArray::startFloatArray,
+ FloatArray::addValues,
+ FloatArray::endFloatArray
+ );
+ }
+
+ public static ArrayResult parseFloat(byte[] buf) {
+ return parsePrimitive(
+ buf, ScalarArray.FloatArray,
+ c -> (FloatArray) c.data(new FloatArray()),
+ FloatArray::valuesLength,
+ float[]::new,
+ (a, i, out) -> ((float[]) out)[i] = a.values(i)
+ );
+ }
+
+ // ---------------- Double ----------------
+ public static byte[] serializeDouble(double[] values, boolean[] validity) {
+ return serializePrimitive(
+ values, validity, ScalarArray.DoubleArray,
+ DoubleArray::createValuesVector,
+ DoubleArray::startDoubleArray,
+ DoubleArray::addValues,
+ DoubleArray::endDoubleArray
+ );
+ }
+
+ public static ArrayResult parseDouble(byte[] buf) {
+ return parsePrimitive(
+ buf, ScalarArray.DoubleArray,
+ c -> (DoubleArray) c.data(new DoubleArray()),
+ DoubleArray::valuesLength,
+ double[]::new,
+ (a, i, out) -> ((double[]) out)[i] = a.values(i)
+ );
+ }
+
+ // ---------------- String ----------------
+ public static byte[] serializeString(String[] values, boolean[] validity) {
+ FlatBufferBuilder b = new FlatBufferBuilder();
+ int[] offs = new int[values.length];
+ for (int i = 0; i < values.length; i++) {
+ offs[i] = b.createString(values[i] == null ? "" : values[i]);
+ }
+ int vec = StringArray.createValuesVector(b, offs);
+ int valOff = validityOffset(b, validity);
+
+ StringArray.startStringArray(b);
+ StringArray.addValues(b, vec);
+ int arrOff = StringArray.endStringArray(b);
+
+ int root = finishContent(b, ScalarArray.StringArray, arrOff, valOff);
+ b.finish(root);
+ return b.sizedByteArray();
+ }
+
+ public static ArrayResult parseString(byte[] buf) {
+ Objects.requireNonNull(buf, "buf must not be null");
+ Content c = readContent(buf);
+ if (c.dataType() != ScalarArray.StringArray) {
+ throw new IllegalArgumentException("Unexpected type");
+ }
+ StringArray arr = new StringArray();
+ c.data(arr);
+ int m = arr.valuesLength();
+ boolean[] validity = buildValidity(c, m);
+ String[] out = new String[m];
+ for (int i = 0; i < m; i++) {
+ String s = arr.values(i);
+ out[i] = (validity.length > i && !validity[i]) ? null : s;
+ }
+ return new ArrayResult(out, validity);
+ }
+
+ // ---------------- Binary ----------------
+ public static byte[] serializeBinary(byte[][] values, boolean[] validity) {
+ FlatBufferBuilder b = new FlatBufferBuilder();
+ int[] elemOffs = new int[values.length];
+ for (int i = 0; i < values.length; i++) {
+ byte[] v = (values[i] == null) ? new byte[0] : values[i];
+ int vec = UInt8Array.createValuesVector(b, v);
+ elemOffs[i] = UInt8Array.createUInt8Array(b, vec);
+ }
+ int valsVec = BinaryArray.createValuesVector(b, elemOffs);
+ int arrOff = BinaryArray.createBinaryArray(b, valsVec);
+ int valOff = validityOffset(b, validity);
+ int root = finishContent(b, ScalarArray.BinaryArray, arrOff, valOff);
+ b.finish(root);
+ return b.sizedByteArray();
+ }
+
+ public static ArrayResult parseBinary(byte[] buf) {
+ Objects.requireNonNull(buf, "buf must not be null");
+ Content c = readContent(buf);
+ if (c.dataType() != ScalarArray.BinaryArray) {
+ throw new IllegalArgumentException("Unexpected type");
+ }
+ BinaryArray arr = new BinaryArray();
+ c.data(arr);
+ int m = arr.valuesLength();
+ boolean[] validity = buildValidity(c, m);
+ byte[][] out = new byte[m][];
+ for (int i = 0; i < m; i++) {
+ ByteBuffer bb = arr.values(i).valuesAsByteBuffer();
+ byte[] v = new byte[bb.remaining()];
+ bb.get(v);
+ out[i] = v;
+ }
+ return new ArrayResult(out, validity);
+ }
+
+ // ---------------- Generic serialize/parse ----------------
+ private static <V> byte[] serializePrimitive(
+ V values, boolean[] validity, int discriminator,
+ CreateVec<V> createVec, Start start, AddValues addValues, End end) {
+
+ if (values != null && validity != null) {
+ int valueLen = java.lang.reflect.Array.getLength(values);
+ if (validity.length != valueLen) {
+ throw new IllegalArgumentException(
+ String.format("Validity length %d does not match values length %d",
+ validity.length, valueLen));
+ }
+ }
+
+ FlatBufferBuilder b = new FlatBufferBuilder();
+ int vecOff = (values == null) ? 0 : createVec.apply(b, values);
+ int valOff = validityOffset(b, validity);
+
+ start.apply(b);
+ if (vecOff != 0) {
+ addValues.apply(b, vecOff);
+ }
+ int arrOff = end.apply(b);
+
+ int root = finishContent(b, discriminator, arrOff, valOff);
+ b.finish(root);
+ return b.sizedByteArray();
+ }
+
+ private static <A> ArrayResult parsePrimitive(
+ byte[] buf, int discriminator,
+ GetUnion<A> getUnion, ArrLen<A> lenFn,
+ NewArray newArray, Copier<A> copy) {
+
+ Objects.requireNonNull(buf, "buf must not be null");
+ Content c = readContent(buf);
+ if (c.dataType() != discriminator) {
+ throw new IllegalArgumentException("Unexpected union type: " +
c.dataType());
+ }
+ A arr = getUnion.apply(c);
+ int m = lenFn.apply(arr);
+ boolean[] validity = buildValidity(c, m);
+
+ Object values = newArray.apply(m);
+ for (int i = 0; i < m; i++) {
+ copy.copy(arr, i, values);
+ }
+
+ return new ArrayResult(values, validity);
+ }
+}
diff --git
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestArraySerdes.java
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestArraySerdes.java
new file mode 100644
index 000000000..2cbaa285c
--- /dev/null
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestArraySerdes.java
@@ -0,0 +1,309 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.client;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.util.Arrays;
+
+import org.junit.Test;
+
+public class TestArraySerdes {
+ // Helper to box primitive arrays for comparison
+ private static Object[] toObjectArray(Object arr) {
+ if (arr instanceof int[]) {
+ return Arrays.stream((int[]) arr).boxed().toArray();
+ }
+ if (arr instanceof long[]) {
+ return Arrays.stream((long[]) arr).boxed().toArray();
+ }
+ if (arr instanceof double[]) {
+ return Arrays.stream((double[]) arr).boxed().toArray();
+ }
+ if (arr instanceof float[]) {
+ float[] f = (float[]) arr;
+ Float[] boxed = new Float[f.length];
+ for (int i = 0; i < f.length; i++) {
+ boxed[i] = f[i];
+ }
+ return boxed;
+ }
+ if (arr instanceof short[]) {
+ short[] s = (short[]) arr;
+ Short[] boxed = new Short[s.length];
+ for (int i = 0; i < s.length; i++) {
+ boxed[i] = s[i];
+ }
+ return boxed;
+ }
+ if (arr instanceof byte[]) {
+ byte[] b = (byte[]) arr;
+ Byte[] boxed = new Byte[b.length];
+ for (int i = 0; i < b.length; i++) {
+ boxed[i] = b[i];
+ }
+ return boxed;
+ }
+ if (arr instanceof String[]) {
+ return (String[]) arr;
+ }
+ if (arr instanceof byte[][]) {
+ return (byte[][]) arr;
+ }
+ throw new IllegalArgumentException("Unsupported array type: " +
arr.getClass());
+ }
+
+ // ----------------------------
+ // INT8
+ // ----------------------------
+ @Test
+ public void testInt8RoundTrip() {
+ byte[] vals = {1, -2, 127};
+ boolean[] validity = {true, false, true};
+
+ byte[] buf = Array1dSerdes.serializeInt8(vals, validity);
+ Array1dSerdes.ArrayResult res = Array1dSerdes.parseInt8(buf);
+
+ assertArrayEquals(vals, (byte[]) res.getValues());
+ assertArrayEquals(validity, res.getValidity());
+ }
+
+ // ----------------------------
+ // INT32
+ // ----------------------------
+ @Test
+ public void testInt32RoundTrip() {
+ int[] vals = {1, -2, 42};
+ boolean[] validity = {true, false, true};
+
+ byte[] buf = Array1dSerdes.serializeInt32(vals, validity);
+ Array1dSerdes.ArrayResult res = Array1dSerdes.parseInt32(buf);
+
+ assertArrayEquals(vals, (int[]) res.getValues());
+ assertArrayEquals(validity, res.getValidity());
+ }
+
+ // ----------------------------
+ // INT64
+ // ----------------------------
+ @Test
+ public void testInt64RoundTrip() {
+ long[] vals = {1L, -2L, Long.MAX_VALUE};
+ boolean[] validity = {true, true, false};
+
+ byte[] buf = Array1dSerdes.serializeInt64(vals, validity);
+ Array1dSerdes.ArrayResult res = Array1dSerdes.parseInt64(buf);
+
+ assertArrayEquals(vals, (long[]) res.getValues());
+ assertArrayEquals(validity, res.getValidity());
+ }
+
+ // ----------------------------
+ // INT16
+ // ----------------------------
+ @Test
+ public void testInt16RoundTrip() {
+ short[] vals = {1, -2, 32767};
+ boolean[] validity = {true, true, true};
+
+ byte[] buf = Array1dSerdes.serializeInt16(vals, validity);
+ Array1dSerdes.ArrayResult res = Array1dSerdes.parseInt16(buf);
+
+ assertArrayEquals(vals, (short[]) res.getValues());
+ assertArrayEquals(validity, res.getValidity());
+ }
+
+ // ----------------------------
+ // UINT8
+ // ----------------------------
+ @Test
+ public void testUInt8RoundTrip() {
+ byte[] vals = {(byte) 0, (byte) 255};
+ boolean[] validity = {true, true};
+
+ byte[] buf = Array1dSerdes.serializeUInt8(vals, validity);
+ Array1dSerdes.ArrayResult res = Array1dSerdes.parseUInt8(buf);
+
+ int[] parsed = (int[]) res.getValues();
+ assertEquals(0, parsed[0]);
+ assertEquals(255, parsed[1]);
+ assertArrayEquals(validity, res.getValidity());
+ }
+
+ // ----------------------------
+ // UINT16
+ // ----------------------------
+ @Test
+ public void testUInt16RoundTrip() {
+ short[] vals = {0, (short) 0xFFFF};
+ boolean[] validity = {true, true};
+
+ byte[] buf = Array1dSerdes.serializeUInt16(vals, validity);
+ Array1dSerdes.ArrayResult res = Array1dSerdes.parseUInt16(buf);
+
+ int[] parsed = (int[]) res.getValues();
+ assertEquals(0, parsed[0]);
+ assertEquals(65535, parsed[1]);
+ assertArrayEquals(validity, res.getValidity());
+ }
+
+ // ----------------------------
+ // UINT32
+ // ----------------------------
+ @Test
+ public void testUInt32RoundTrip() {
+ int[] vals = {0, -1};
+ boolean[] validity = {true, true};
+
+ byte[] buf = Array1dSerdes.serializeUInt32(vals, validity);
+ Array1dSerdes.ArrayResult res = Array1dSerdes.parseUInt32(buf);
+
+ long[] parsed = (long[]) res.getValues();
+ assertEquals(0L, parsed[0]);
+ assertEquals(4294967295L, parsed[1]);
+ assertArrayEquals(validity, res.getValidity());
+ }
+
+ // ----------------------------
+ // UINT64
+ // ----------------------------
+ @Test
+ public void testUInt64RoundTrip() {
+ long[] vals = {0L, -1L};
+ boolean[] validity = {true, true};
+
+ byte[] buf = Array1dSerdes.serializeUInt64(vals, validity);
+ Array1dSerdes.ArrayResult res = Array1dSerdes.parseUInt64(buf);
+
+ long[] parsed = (long[]) res.getValues();
+ assertEquals(0L, parsed[0]);
+ assertEquals(-1L, parsed[1]);
+ assertArrayEquals(validity, res.getValidity());
+ }
+
+ // ----------------------------
+ // FLOAT
+ // ----------------------------
+ @Test
+ public void testFloatRoundTrip() {
+ float[] vals = {1.0f, -2.5f, Float.NaN};
+ boolean[] validity = {true, false, true};
+
+ byte[] buf = Array1dSerdes.serializeFloat(vals, validity);
+ Array1dSerdes.ArrayResult res = Array1dSerdes.parseFloat(buf);
+
+ assertArrayEquals(vals, (float[]) res.getValues(), 0.0001f);
+ assertArrayEquals(validity, res.getValidity());
+ }
+
+ // ----------------------------
+ // DOUBLE
+ // ----------------------------
+ @Test
+ public void testDoubleRoundTrip() {
+ double[] vals = {1.0, -2.5, Double.MAX_VALUE};
+ boolean[] validity = {true, true, false};
+
+ byte[] buf = Array1dSerdes.serializeDouble(vals, validity);
+ Array1dSerdes.ArrayResult res = Array1dSerdes.parseDouble(buf);
+
+ assertArrayEquals(vals, (double[]) res.getValues(), 0.0001);
+ assertArrayEquals(validity, res.getValidity());
+ }
+
+ // ----------------------------
+ // STRING
+ // ----------------------------
+ @Test
+ public void testStringRoundTrip() {
+ String[] vals = {"foo", "bar", null};
+ boolean[] validity = {true, true, false};
+
+ byte[] buf = Array1dSerdes.serializeString(vals, validity);
+ Array1dSerdes.ArrayResult res = Array1dSerdes.parseString(buf);
+
+ assertArrayEquals(vals, (String[]) res.getValues());
+ assertArrayEquals(validity, res.getValidity());
+ }
+
+ // ----------------------------
+ // BINARY
+ // ----------------------------
+ @Test
+ public void testBinaryRoundTrip() {
+ byte[][] vals = {
+ new byte[] {1, 2, 3},
+ new byte[] {10, 20},
+ null
+ };
+ boolean[] validity = {true, true, false};
+
+ byte[] buf = Array1dSerdes.serializeBinary(vals, validity);
+ Array1dSerdes.ArrayResult res = Array1dSerdes.parseBinary(buf);
+
+ byte[][] decoded = (byte[][]) res.getValues();
+
+ assertEquals(vals.length, decoded.length);
+ for (int i = 0; i < vals.length; i++) {
+ if (validity[i]) {
+ assertArrayEquals(vals[i], decoded[i]);
+ } else {
+ assertEquals(0, decoded[i].length);
+ }
+ }
+
+ assertArrayEquals(validity, res.getValidity());
+ }
+
+ // ----------------------------
+ // INT32 edge cases
+ // ----------------------------
+ @Test
+ public void testInt32RoundTripEmptyArray() {
+ int[] vals = {};
+ boolean[] validity = {};
+
+ byte[] buf = Array1dSerdes.serializeInt32(vals, validity);
+ Array1dSerdes.ArrayResult res = Array1dSerdes.parseInt32(buf);
+
+ assertEquals(0, ((int[]) res.getValues()).length);
+ assertEquals(0, res.getValidity().length);
+ }
+
+ // Null and empty arrays both deserialize to int[0] because FlatBuffers
treats “absent”
+ // and “empty” vectors the same. The actual null vs. empty distinction is
tracked
+ // separately via the cell’s non-null bitmap.
+ @Test
+ public void testInt32RoundTripNullArray() {
+ int[] vals = null;
+ boolean[] validity = null;
+
+ byte[] buf = Array1dSerdes.serializeInt32(vals, validity);
+ Array1dSerdes.ArrayResult res = Array1dSerdes.parseInt32(buf);
+
+ // Expect the result to represent an empty array.
+ assertNotNull(res.getValues());
+ assertEquals(0, ((int[]) res.getValues()).length);
+
+ // Validity should also be non-null and length 0
+ assertNotNull(res.getValidity());
+ assertEquals(0, res.getValidity().length);
+ }
+}
diff --git a/java/kudu-flatbuffers/build.gradle
b/java/kudu-flatbuffers/build.gradle
new file mode 100644
index 000000000..af8e15732
--- /dev/null
+++ b/java/kudu-flatbuffers/build.gradle
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+plugins {
+ id("java-library")
+}
+
+dependencies {
+ implementation libs.flatbuffersJava
+}
+
+def fbsDir = file("$rootDir/../src/kudu/common/serdes")
+def flatcOutputDir = file("$buildDir/generated/flatbuffers")
+
+// Allow override with -Pflatc=/path/to/flatc or FLATC env var
+def flatcOverride = (findProperty("flatc") ?: System.getenv("FLATC")) as String
+def flatcExe
+
+if (flatcOverride) {
+ def f = file(flatcOverride)
+ if (!f.exists()) {
+ throw new GradleException("FLATC override not found: $flatcOverride")
+ }
+ flatcExe = f.absolutePath
+} else{
+ def flatcPath =
file("${projectDir}/../../thirdparty/installed/uninstrumented/bin/flatc")
+ if (!flatcPath.exists()) {
+ throw new GradleException("No flatc binary found at ${flatcPath}")
+ }
+ flatcExe = flatcPath.absolutePath
+ }
+
+logger.lifecycle("Using flatc: $flatcExe")
+
+tasks.register("generateFlatBuffers", Exec) {
+ inputs.files fileTree(dir: fbsDir, include: "**/*.fbs")
+ outputs.dir flatcOutputDir
+
+ doFirst {
+ def fbsFiles = fileTree(dir: fbsDir, include: "**/*.fbs").files
+ if (fbsFiles.isEmpty()) {
+ println "No .fbs files found in ${fbsDir}. Skipping FlatBuffers
generation."
+ enabled = false
+ return
+ }
+ flatcOutputDir.mkdirs()
+ commandLine = [flatcExe, "--java", "--java-package-prefix", "org.apache.",
+ "-o", flatcOutputDir.absolutePath] + fbsFiles.collect {
it.path }
+ }
+}
+
+sourceSets {
+ main {
+ java {
+ srcDir flatcOutputDir
+ }
+ }
+}
+
+compileJava.dependsOn(tasks.named("generateFlatBuffers"))
+
+// kudu-flatbuffers has no public Javadoc.
+javadoc {
+ enabled = false
+}
+
+// Disable checkstyle
+tasks.withType(Checkstyle).configureEach {
+ enabled = false
+}
+// Skip publishing kudu-flatbuffers artifact because it will always be shaded
into kudu-client.
+publish.enabled = false
+publishToMavenLocal.enabled = false
diff --git a/java/settings.gradle b/java/settings.gradle
index 62a7639d1..ca09f32ec 100644
--- a/java/settings.gradle
+++ b/java/settings.gradle
@@ -23,6 +23,7 @@ include "kudu-backup"
include "kudu-backup-common"
include "kudu-backup-tools"
include "kudu-client"
+include "kudu-flatbuffers"
include "kudu-hive"
include "kudu-jepsen"
include "kudu-proto"