This is an automated email from the ASF dual-hosted git repository.
chaokunyang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fory.git
The following commit(s) were added to refs/heads/main by this push:
new bad043fba feat(java/python/c++): add schema encoder and remove arrow
serializers (#2938)
bad043fba is described below
commit bad043fba0f3acbc4b892baace8b075b510a4a90
Author: Shawn Yang <[email protected]>
AuthorDate: Thu Nov 27 17:14:42 2025 +0800
feat(java/python/c++): add schema encoder and remove arrow serializers
(#2938)
## What does this PR do?
- add schema encoder in java/python/c++
- remove arrow serializers
## Related issues
#2929
#2922
## Does this PR introduce any user-facing change?
<!--
If any user-facing interface changes, please [open an
issue](https://github.com/apache/fory/issues/new/choose) describing the
need to do so and update the document if necessary.
Delete section if not applicable.
-->
- [ ] Does this PR introduce any public API change?
- [ ] Does this PR introduce any binary protocol compatibility change?
## Benchmark
<!--
When the PR has an impact on performance (if you don't know whether the
PR will have an impact on performance, you can submit the PR first, and
if it will have impact on performance, the code reviewer will explain
it), be sure to attach a benchmark data here.
Delete section if not applicable.
-->
---
cpp/fory/row/BUILD | 1 +
cpp/fory/row/schema.cc | 268 +++++++++++++++++++++
cpp/fory/row/schema.h | 14 ++
cpp/fory/type/type.h | 4 -
.../src/main/java/org/apache/fory/type/Types.java | 6 -
.../org/apache/fory/format/type/ArrowType.java | 4 +-
.../org/apache/fory/format/type/DataTypes.java | 19 +-
.../org/apache/fory/format/type/SchemaEncoder.java | 254 +++++++++++++++++++
.../fory/format/vectorized/ArrowSerializers.java | 181 --------------
.../apache/fory/format/vectorized/ArrowTable.java | 81 -------
.../format/vectorized/ArrowTableSerializer.java | 74 ------
.../org/apache/fory/format/CrossLanguageTest.java | 264 +-------------------
.../apache/fory/format/type/SchemaEncoderTest.java | 160 ++++++++++++
.../format/vectorized/ArrowSerializersTest.java | 178 --------------
python/pyfory/_registry.py | 15 --
python/pyfory/format/__init__.py | 1 +
python/pyfory/format/infer.py | 112 +++++++++
python/pyfory/format/schema.pxi | 32 +++
python/pyfory/format/serializer.py | 119 ---------
python/pyfory/includes/libformat.pxd | 6 +-
python/pyfory/includes/libserialization.pxd | 2 -
python/pyfory/tests/test_cross_language.py | 121 ++++------
python/pyfory/tests/test_serializer.py | 40 ---
python/pyfory/type.py | 16 +-
rust/fory-core/src/types.rs | 4 -
25 files changed, 910 insertions(+), 1066 deletions(-)
diff --git a/cpp/fory/row/BUILD b/cpp/fory/row/BUILD
index 40ca8c2b7..536083774 100644
--- a/cpp/fory/row/BUILD
+++ b/cpp/fory/row/BUILD
@@ -6,6 +6,7 @@ cc_library(
hdrs = glob(["*.h"]),
strip_include_prefix = "/cpp",
deps = [
+ "//cpp/fory/serialization:fory_serialization",
"//cpp/fory/util:fory_util",
"//cpp/fory/type:fory_type"
],
diff --git a/cpp/fory/row/schema.cc b/cpp/fory/row/schema.cc
new file mode 100644
index 000000000..5d3ef9341
--- /dev/null
+++ b/cpp/fory/row/schema.cc
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "fory/row/schema.h"
+#include "fory/serialization/meta_string.h"
+#include "fory/type/type.h"
+#include <algorithm>
+#include <stdexcept>
+
+namespace fory {
+namespace row {
+
+using namespace serialization;
+
+namespace {
+
+constexpr uint8_t SCHEMA_VERSION = 1;
+constexpr int FIELD_NAME_SIZE_THRESHOLD = 15;
+
+// Field name encodings used for schema serialization
+static const std::vector<MetaEncoding> FIELD_NAME_ENCODINGS = {
+ MetaEncoding::UTF8, MetaEncoding::ALL_TO_LOWER_SPECIAL,
+ MetaEncoding::LOWER_UPPER_DIGIT_SPECIAL};
+
+// Encoder and decoder for field names
+static MetaStringEncoder FIELD_NAME_ENCODER('$', '_');
+static MetaStringDecoder FIELD_NAME_DECODER('$', '_');
+
+Result<void, Error> WriteField(Buffer &buffer, const Field &field);
+Result<FieldPtr, Error> ReadField(Buffer &buffer);
+Result<void, Error> WriteType(Buffer &buffer, const DataType &type);
+Result<DataTypePtr, Error> ReadType(Buffer &buffer);
+
+Result<void, Error> WriteField(Buffer &buffer, const Field &field) {
+ auto encode_result =
+ FIELD_NAME_ENCODER.encode(field.name(), FIELD_NAME_ENCODINGS);
+ if (!encode_result.ok()) {
+ return Unexpected(encode_result.error());
+ }
+ EncodedMetaString encoded = std::move(encode_result.value());
+
+ // Find encoding index
+ int encoding_index = 0;
+ for (size_t i = 0; i < FIELD_NAME_ENCODINGS.size(); ++i) {
+ if (FIELD_NAME_ENCODINGS[i] == encoded.encoding) {
+ encoding_index = static_cast<int>(i);
+ break;
+ }
+ }
+
+ size_t name_size = encoded.bytes.size();
+
+ // Build header byte
+ int header = encoding_index & 0x03; // bits 0-1: encoding
+ bool big_size = name_size > static_cast<size_t>(FIELD_NAME_SIZE_THRESHOLD);
+ if (big_size) {
+ header |= (FIELD_NAME_SIZE_THRESHOLD << 2); // bits 2-5: max value
+ } else {
+ header |=
+ ((static_cast<int>(name_size) - 1) << 2); // bits 2-5: name size - 1
+ }
+ if (field.nullable()) {
+ header |= 0x40; // bit 6: nullable
+ }
+ buffer.WriteUint8(static_cast<uint8_t>(header));
+
+ if (big_size) {
+ buffer.WriteVarUint32(
+ static_cast<uint32_t>(name_size - FIELD_NAME_SIZE_THRESHOLD));
+ }
+ buffer.WriteBytes(encoded.bytes.data(),
+ static_cast<uint32_t>(encoded.bytes.size()));
+
+ return WriteType(buffer, *field.type());
+}
+
+Result<FieldPtr, Error> ReadField(Buffer &buffer) {
+ FORY_TRY(header_byte, buffer.ReadUint8());
+ int header = header_byte;
+ int encoding_index = header & 0x03;
+ int name_size_minus1 = (header >> 2) & 0x0F;
+ bool nullable = (header & 0x40) != 0;
+
+ size_t name_size;
+ if (name_size_minus1 == FIELD_NAME_SIZE_THRESHOLD) {
+ FORY_TRY(extra, buffer.ReadVarUint32());
+ name_size = extra + FIELD_NAME_SIZE_THRESHOLD;
+ } else {
+ name_size = name_size_minus1 + 1;
+ }
+
+ std::vector<uint8_t> name_bytes(name_size);
+ FORY_RETURN_NOT_OK(
+ buffer.ReadBytes(name_bytes.data(), static_cast<uint32_t>(name_size)));
+
+ MetaEncoding encoding = FIELD_NAME_ENCODINGS[encoding_index];
+ auto decode_result =
+ FIELD_NAME_DECODER.decode(name_bytes.data(), name_size, encoding);
+ if (!decode_result.ok()) {
+ return Unexpected(decode_result.error());
+ }
+ std::string name = std::move(decode_result.value());
+
+ FORY_TRY(type, ReadType(buffer));
+ return std::make_shared<Field>(std::move(name), std::move(type), nullable);
+}
+
+Result<void, Error> WriteType(Buffer &buffer, const DataType &type) {
+ buffer.WriteUint8(static_cast<uint8_t>(type.id()));
+
+ if (auto *decimal_type = dynamic_cast<const DecimalType *>(&type)) {
+ buffer.WriteUint8(static_cast<uint8_t>(decimal_type->precision()));
+ buffer.WriteUint8(static_cast<uint8_t>(decimal_type->scale()));
+ } else if (auto *list_type = dynamic_cast<const ListType *>(&type)) {
+ FORY_RETURN_NOT_OK(WriteField(buffer, *list_type->value_field()));
+ } else if (auto *map_type = dynamic_cast<const MapType *>(&type)) {
+ FORY_RETURN_NOT_OK(WriteField(buffer, *map_type->key_field()));
+ FORY_RETURN_NOT_OK(WriteField(buffer, *map_type->item_field()));
+ } else if (auto *struct_type = dynamic_cast<const StructType *>(&type)) {
+ buffer.WriteVarUint32(struct_type->num_fields());
+ for (const auto &field : struct_type->fields()) {
+ FORY_RETURN_NOT_OK(WriteField(buffer, *field));
+ }
+ }
+
+ return Result<void, Error>();
+}
+
+Result<DataTypePtr, Error> ReadType(Buffer &buffer) {
+ FORY_TRY(type_id_byte, buffer.ReadUint8());
+ TypeId type_id = static_cast<TypeId>(type_id_byte);
+
+ switch (type_id) {
+ case TypeId::BOOL:
+ return boolean();
+ case TypeId::INT8:
+ return int8();
+ case TypeId::INT16:
+ return int16();
+ case TypeId::INT32:
+ return int32();
+ case TypeId::INT64:
+ return int64();
+ case TypeId::FLOAT16:
+ return float16();
+ case TypeId::FLOAT32:
+ return float32();
+ case TypeId::FLOAT64:
+ return float64();
+ case TypeId::STRING:
+ return utf8();
+ case TypeId::BINARY:
+ return binary();
+ case TypeId::DURATION:
+ return duration();
+ case TypeId::TIMESTAMP:
+ return timestamp();
+ case TypeId::LOCAL_DATE:
+ return date32();
+ case TypeId::DECIMAL: {
+ FORY_TRY(precision, buffer.ReadUint8());
+ FORY_TRY(scale, buffer.ReadUint8());
+ return decimal(precision, scale);
+ }
+ case TypeId::LIST: {
+ FORY_TRY(value_field, ReadField(buffer));
+ return std::static_pointer_cast<DataType>(
+ std::make_shared<ListType>(std::move(value_field)));
+ }
+ case TypeId::MAP: {
+ FORY_TRY(key_field, ReadField(buffer));
+ FORY_TRY(item_field, ReadField(buffer));
+ return std::static_pointer_cast<DataType>(
+ std::make_shared<MapType>(key_field->type(), item_field->type()));
+ }
+ case TypeId::STRUCT: {
+ FORY_TRY(struct_num_fields, buffer.ReadVarUint32());
+ std::vector<FieldPtr> fields;
+ fields.reserve(struct_num_fields);
+ for (uint32_t i = 0; i < struct_num_fields; ++i) {
+ FORY_TRY(struct_field, ReadField(buffer));
+ fields.push_back(std::move(struct_field));
+ }
+ return std::static_pointer_cast<DataType>(
+ std::make_shared<StructType>(std::move(fields)));
+ }
+ default:
+ return Unexpected(Error::invalid_data("Unknown type id: " +
+ std::to_string(type_id_byte)));
+ }
+}
+
+} // anonymous namespace
+
+std::vector<uint8_t> Schema::ToBytes() const {
+ Buffer buffer;
+ ToBytes(buffer);
+ std::vector<uint8_t> result(buffer.writer_index());
+ buffer.Copy(0, buffer.writer_index(), result.data());
+ return result;
+}
+
+void Schema::ToBytes(Buffer &buffer) const {
+ buffer.WriteUint8(SCHEMA_VERSION);
+ buffer.WriteVarUint32(num_fields());
+ for (const auto &f : fields_) {
+ auto result = WriteField(buffer, *f);
+ if (!result.ok()) {
+ throw std::runtime_error(result.error().message());
+ }
+ }
+}
+
+SchemaPtr Schema::FromBytes(const std::vector<uint8_t> &bytes) {
+ Buffer buffer(const_cast<uint8_t *>(bytes.data()),
+ static_cast<uint32_t>(bytes.size()), false);
+ return FromBytes(buffer);
+}
+
+SchemaPtr Schema::FromBytes(Buffer &buffer) {
+ auto version_result = buffer.ReadUint8();
+ if (!version_result.ok()) {
+ throw std::runtime_error(version_result.error().message());
+ }
+ uint8_t version = version_result.value();
+ if (version != SCHEMA_VERSION) {
+ throw std::runtime_error(
+ "Unsupported schema version: " + std::to_string(version) +
+ ", expected: " + std::to_string(SCHEMA_VERSION));
+ }
+
+ auto num_fields_result = buffer.ReadVarUint32();
+ if (!num_fields_result.ok()) {
+ throw std::runtime_error(num_fields_result.error().message());
+ }
+ uint32_t num_fields = num_fields_result.value();
+
+ std::vector<FieldPtr> fields;
+ fields.reserve(num_fields);
+ for (uint32_t i = 0; i < num_fields; ++i) {
+ auto field_result = ReadField(buffer);
+ if (!field_result.ok()) {
+ throw std::runtime_error(field_result.error().message());
+ }
+ fields.push_back(std::move(field_result.value()));
+ }
+
+ return std::make_shared<Schema>(std::move(fields));
+}
+
+} // namespace row
+} // namespace fory
diff --git a/cpp/fory/row/schema.h b/cpp/fory/row/schema.h
index ecd6cfaa6..f6ac98646 100644
--- a/cpp/fory/row/schema.h
+++ b/cpp/fory/row/schema.h
@@ -40,7 +40,9 @@
#pragma once
#include "fory/type/type.h"
+#include "fory/util/buffer.h"
#include <climits>
+#include <cstdint>
#include <memory>
#include <string>
#include <unordered_map>
@@ -518,6 +520,18 @@ public:
return Equals(*other);
}
+ /// Serialize this schema to a byte vector.
+ std::vector<uint8_t> ToBytes() const;
+
+ /// Serialize this schema to a Buffer.
+ void ToBytes(Buffer &buffer) const;
+
+ /// Deserialize a schema from a byte vector.
+ static SchemaPtr FromBytes(const std::vector<uint8_t> &bytes);
+
+ /// Deserialize a schema from a Buffer.
+ static SchemaPtr FromBytes(Buffer &buffer);
+
private:
std::vector<FieldPtr> fields_;
std::unordered_map<std::string, int> name_to_index_;
diff --git a/cpp/fory/type/type.h b/cpp/fory/type/type.h
index a2937637a..cface254e 100644
--- a/cpp/fory/type/type.h
+++ b/cpp/fory/type/type.h
@@ -101,10 +101,6 @@ enum class TypeId : int32_t {
FLOAT32_ARRAY = 36,
// one-dimensional float64 array.
FLOAT64_ARRAY = 37,
- // an arrow record batch object.
- ARROW_RECORD_BATCH = 38,
- // an arrow table object.
- ARROW_TABLE = 39,
// Unknown/polymorphic type marker.
UNKNOWN = 64,
// Bound value, typically used as a sentinel value.
diff --git a/java/fory-core/src/main/java/org/apache/fory/type/Types.java
b/java/fory-core/src/main/java/org/apache/fory/type/Types.java
index c6300b2be..88212144d 100644
--- a/java/fory-core/src/main/java/org/apache/fory/type/Types.java
+++ b/java/fory-core/src/main/java/org/apache/fory/type/Types.java
@@ -156,12 +156,6 @@ public class Types {
/** One dimensional float64 array. */
public static final int FLOAT64_ARRAY = 37;
- /** An (arrow record batch) object. */
- public static final int ARROW_RECORD_BATCH = 38;
-
- /** An (arrow table) object. */
- public static final int ARROW_TABLE = 39;
-
public static final int UNKNOWN = 63;
// Helper methods
diff --git
a/java/fory-format/src/main/java/org/apache/fory/format/type/ArrowType.java
b/java/fory-format/src/main/java/org/apache/fory/format/type/ArrowType.java
index 539371a03..37f43d67f 100644
--- a/java/fory-format/src/main/java/org/apache/fory/format/type/ArrowType.java
+++ b/java/fory-format/src/main/java/org/apache/fory/format/type/ArrowType.java
@@ -158,9 +158,7 @@ public enum ArrowType {
FORY_PRIMITIVE_DOUBLE_ARRAY(263),
FORY_STRING_ARRAY(264),
FORY_SERIALIZED_OBJECT(265),
- FORY_BUFFER(266),
- FORY_ARROW_RECORD_BATCH(267),
- FORY_ARROW_TABLE(268);
+ FORY_BUFFER(266);
private short id;
diff --git
a/java/fory-format/src/main/java/org/apache/fory/format/type/DataTypes.java
b/java/fory-format/src/main/java/org/apache/fory/format/type/DataTypes.java
index 48a7b5223..f3007bbf6 100644
--- a/java/fory-format/src/main/java/org/apache/fory/format/type/DataTypes.java
+++ b/java/fory-format/src/main/java/org/apache/fory/format/type/DataTypes.java
@@ -876,41 +876,36 @@ public class DataTypes {
}
//
============================================================================
- // Schema serialization (uses Arrow format for cross-language compatibility)
+ // Schema serialization
//
============================================================================
/**
- * Serializes a Fory Schema to bytes using Arrow's schema format.
+ * Serializes a Fory Schema to bytes.
*
* @param schema the schema to serialize
* @return serialized bytes
*/
public static byte[] serializeSchema(Schema schema) {
- org.apache.arrow.vector.types.pojo.Schema arrowSchema =
- ArrowSchemaConverter.toArrowSchema(schema);
- return arrowSchema.toByteArray();
+ return SchemaEncoder.toBytes(schema);
}
/**
- * Serializes a Fory Schema to a MemoryBuffer using Arrow's schema format.
+ * Serializes a Fory Schema to a MemoryBuffer.
*
* @param schema the schema to serialize
* @param buffer the buffer to write to
*/
public static void serializeSchema(Schema schema,
org.apache.fory.memory.MemoryBuffer buffer) {
- byte[] bytes = serializeSchema(schema);
- buffer.writeBytes(bytes);
+ SchemaEncoder.toBytes(schema, buffer);
}
/**
- * Deserializes a Fory Schema from bytes using Arrow's schema format.
+ * Deserializes a Fory Schema from bytes.
*
* @param bytes the serialized bytes
* @return the deserialized schema
*/
public static Schema deserializeSchema(byte[] bytes) {
- org.apache.arrow.vector.types.pojo.Schema arrowSchema =
-
org.apache.arrow.vector.types.pojo.Schema.deserialize(java.nio.ByteBuffer.wrap(bytes));
- return ArrowSchemaConverter.fromArrowSchema(arrowSchema);
+ return SchemaEncoder.fromBytes(bytes);
}
}
diff --git
a/java/fory-format/src/main/java/org/apache/fory/format/type/SchemaEncoder.java
b/java/fory-format/src/main/java/org/apache/fory/format/type/SchemaEncoder.java
new file mode 100644
index 000000000..a88b24811
--- /dev/null
+++
b/java/fory-format/src/main/java/org/apache/fory/format/type/SchemaEncoder.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.fory.format.type;
+
+import static org.apache.fory.meta.MetaString.Encoding.ALL_TO_LOWER_SPECIAL;
+import static
org.apache.fory.meta.MetaString.Encoding.LOWER_UPPER_DIGIT_SPECIAL;
+import static org.apache.fory.meta.MetaString.Encoding.UTF_8;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.fory.memory.MemoryBuffer;
+import org.apache.fory.meta.MetaString;
+import org.apache.fory.meta.MetaStringDecoder;
+import org.apache.fory.meta.MetaStringEncoder;
+import org.apache.fory.type.Types;
+
+/**
+ * Encoder for Fory row format Schema.
+ *
+ * <p>Schema binary format:
+ *
+ * <pre>
+ * | version (1 byte) | num_fields (varint) | field1 | field2 | ... |
+ *
+ * Field format:
+ * | header (1 byte) | name_size (varint if big) | name_bytes | type_info |
+ *
+ * Header byte:
+ * - bits 0-1: encoding (0=UTF8, 1=ALL_TO_LOWER_SPECIAL,
2=LOWER_UPPER_DIGIT_SPECIAL)
+ * - bits 2-5: name size - 1 (0-15, if 15 then varint follows for larger sizes)
+ * - bit 6: nullable flag
+ * - bit 7: reserved
+ *
+ * Type info:
+ * - type_id (1 byte)
+ * - For DECIMAL: precision (1 byte) + scale (1 byte)
+ * - For LIST: element type (recursive)
+ * - For MAP: key type (recursive) + value type (recursive)
+ * - For STRUCT: num_fields (varint) + fields (recursive)
+ * </pre>
+ */
+public class SchemaEncoder {
+
+ private static final byte SCHEMA_VERSION = 1;
+ private static final int FIELD_NAME_SIZE_THRESHOLD = 15;
+
+ private static final MetaString.Encoding[] FIELD_NAME_ENCODINGS =
+ new MetaString.Encoding[] {UTF_8, ALL_TO_LOWER_SPECIAL,
LOWER_UPPER_DIGIT_SPECIAL};
+ private static final List<MetaString.Encoding> FIELD_NAME_ENCODINGS_LIST =
+ Arrays.asList(FIELD_NAME_ENCODINGS);
+
+ private static final MetaStringEncoder FIELD_NAME_ENCODER = new
MetaStringEncoder('$', '_');
+ private static final MetaStringDecoder FIELD_NAME_DECODER = new
MetaStringDecoder('$', '_');
+
+ /**
+ * Serializes a Schema to a byte array.
+ *
+ * @param schema the schema to serialize
+ * @return serialized bytes
+ */
+ public static byte[] toBytes(Schema schema) {
+ MemoryBuffer buffer = MemoryBuffer.newHeapBuffer(64);
+ toBytes(schema, buffer);
+ return buffer.getBytes(0, buffer.writerIndex());
+ }
+
+ /**
+ * Serializes a Schema to a MemoryBuffer.
+ *
+ * @param schema the schema to serialize
+ * @param buffer the buffer to write to
+ */
+ public static void toBytes(Schema schema, MemoryBuffer buffer) {
+ buffer.writeByte(SCHEMA_VERSION);
+ buffer.writeVarUint32Small7(schema.numFields());
+ for (Field field : schema.fields()) {
+ writeField(buffer, field);
+ }
+ }
+
+ /**
+ * Deserializes a Schema from a byte array.
+ *
+ * @param bytes the serialized bytes
+ * @return the deserialized schema
+ */
+ public static Schema fromBytes(byte[] bytes) {
+ MemoryBuffer buffer = MemoryBuffer.fromByteArray(bytes);
+ return fromBytes(buffer);
+ }
+
+ /**
+ * Deserializes a Schema from a MemoryBuffer.
+ *
+ * @param buffer the buffer to read from
+ * @return the deserialized schema
+ */
+ public static Schema fromBytes(MemoryBuffer buffer) {
+ byte version = buffer.readByte();
+ if (version != SCHEMA_VERSION) {
+ throw new IllegalArgumentException(
+ "Unsupported schema version: " + version + ", expected: " +
SCHEMA_VERSION);
+ }
+ int numFields = buffer.readVarUint32Small7();
+ List<Field> fields = new ArrayList<>(numFields);
+ for (int i = 0; i < numFields; i++) {
+ fields.add(readField(buffer));
+ }
+ return new Schema(fields);
+ }
+
+ private static void writeField(MemoryBuffer buffer, Field field) {
+ MetaString metaString = FIELD_NAME_ENCODER.encode(field.name(),
FIELD_NAME_ENCODINGS);
+ int encodingIndex =
FIELD_NAME_ENCODINGS_LIST.indexOf(metaString.getEncoding());
+ byte[] nameBytes = metaString.getBytes();
+ int nameSize = nameBytes.length;
+
+ // Build header byte
+ int header = encodingIndex & 0x03; // bits 0-1: encoding
+ boolean bigSize = nameSize > FIELD_NAME_SIZE_THRESHOLD;
+ if (bigSize) {
+ header |= (FIELD_NAME_SIZE_THRESHOLD << 2); // bits 2-5: max value means
varint follows
+ } else {
+ header |= ((nameSize - 1) << 2); // bits 2-5: name size - 1
+ }
+ if (field.nullable()) {
+ header |= 0x40; // bit 6: nullable
+ }
+ buffer.writeByte(header);
+
+ if (bigSize) {
+ buffer.writeVarUint32Small7(nameSize - FIELD_NAME_SIZE_THRESHOLD);
+ }
+ buffer.writeBytes(nameBytes);
+
+ writeType(buffer, field.type());
+ }
+
+ private static Field readField(MemoryBuffer buffer) {
+ int header = buffer.readByte() & 0xFF;
+ int encodingIndex = header & 0x03;
+ int nameSizeMinus1 = (header >> 2) & 0x0F;
+ boolean nullable = (header & 0x40) != 0;
+
+ int nameSize;
+ if (nameSizeMinus1 == FIELD_NAME_SIZE_THRESHOLD) {
+ nameSize = buffer.readVarUint32Small7() + FIELD_NAME_SIZE_THRESHOLD;
+ } else {
+ nameSize = nameSizeMinus1 + 1;
+ }
+
+ byte[] nameBytes = buffer.readBytes(nameSize);
+ MetaString.Encoding encoding = FIELD_NAME_ENCODINGS[encodingIndex];
+ String name = FIELD_NAME_DECODER.decode(nameBytes, encoding);
+
+ DataType type = readType(buffer);
+ return new Field(name, type, nullable);
+ }
+
+ private static void writeType(MemoryBuffer buffer, DataType type) {
+ int typeId = type.typeId();
+ buffer.writeByte(typeId);
+
+ if (type instanceof DataTypes.DecimalType) {
+ DataTypes.DecimalType decimalType = (DataTypes.DecimalType) type;
+ buffer.writeByte(decimalType.precision());
+ buffer.writeByte(decimalType.scale());
+ } else if (type instanceof DataTypes.ListType) {
+ DataTypes.ListType listType = (DataTypes.ListType) type;
+ writeField(buffer, listType.valueField());
+ } else if (type instanceof DataTypes.MapType) {
+ DataTypes.MapType mapType = (DataTypes.MapType) type;
+ writeField(buffer, mapType.keyField());
+ writeField(buffer, mapType.itemField());
+ } else if (type instanceof DataTypes.StructType) {
+ DataTypes.StructType structType = (DataTypes.StructType) type;
+ buffer.writeVarUint32Small7(structType.numFields());
+ for (Field field : structType.fields()) {
+ writeField(buffer, field);
+ }
+ }
+ }
+
+ private static DataType readType(MemoryBuffer buffer) {
+ int typeId = buffer.readByte() & 0xFF;
+
+ switch (typeId) {
+ case Types.BOOL:
+ return DataTypes.bool();
+ case Types.INT8:
+ return DataTypes.int8();
+ case Types.INT16:
+ return DataTypes.int16();
+ case Types.INT32:
+ return DataTypes.int32();
+ case Types.INT64:
+ return DataTypes.int64();
+ case Types.FLOAT16:
+ return DataTypes.float16();
+ case Types.FLOAT32:
+ return DataTypes.float32();
+ case Types.FLOAT64:
+ return DataTypes.float64();
+ case Types.STRING:
+ return DataTypes.utf8();
+ case Types.BINARY:
+ return DataTypes.binary();
+ case Types.DURATION:
+ return DataTypes.duration();
+ case Types.TIMESTAMP:
+ return DataTypes.timestamp();
+ case Types.LOCAL_DATE:
+ return DataTypes.date32();
+ case Types.DECIMAL:
+ int precision = buffer.readByte() & 0xFF;
+ int scale = buffer.readByte() & 0xFF;
+ return DataTypes.decimal(precision, scale);
+ case Types.LIST:
+ Field valueField = readField(buffer);
+ return new DataTypes.ListType(valueField);
+ case Types.MAP:
+ Field keyField = readField(buffer);
+ Field itemField = readField(buffer);
+ return new DataTypes.MapType(keyField.type(), itemField.type());
+ case Types.STRUCT:
+ int numFields = buffer.readVarUint32Small7();
+ List<Field> fields = new ArrayList<>(numFields);
+ for (int i = 0; i < numFields; i++) {
+ fields.add(readField(buffer));
+ }
+ return new DataTypes.StructType(fields);
+ default:
+ throw new IllegalArgumentException("Unknown type id: " + typeId);
+ }
+ }
+}
diff --git
a/java/fory-format/src/main/java/org/apache/fory/format/vectorized/ArrowSerializers.java
b/java/fory-format/src/main/java/org/apache/fory/format/vectorized/ArrowSerializers.java
deleted file mode 100644
index ec482cbcc..000000000
---
a/java/fory-format/src/main/java/org/apache/fory/format/vectorized/ArrowSerializers.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.fory.format.vectorized;
-
-import java.io.IOException;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.WritableByteChannel;
-import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.vector.VectorSchemaRoot;
-import org.apache.arrow.vector.ipc.ArrowStreamReader;
-import org.apache.arrow.vector.ipc.ArrowStreamWriter;
-import org.apache.arrow.vector.ipc.WriteChannel;
-import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
-import org.apache.arrow.vector.ipc.message.IpcOption;
-import org.apache.arrow.vector.ipc.message.MessageSerializer;
-import org.apache.fory.Fory;
-import org.apache.fory.io.MemoryBufferReadableChannel;
-import org.apache.fory.io.MemoryBufferWritableChannel;
-import org.apache.fory.io.MockWritableChannel;
-import org.apache.fory.memory.MemoryBuffer;
-import org.apache.fory.memory.MemoryUtils;
-import org.apache.fory.memory.Platform;
-import org.apache.fory.resolver.XtypeResolver;
-import org.apache.fory.serializer.BufferObject;
-import
org.apache.fory.serializer.Serializers.CrossLanguageCompatibleSerializer;
-import org.apache.fory.type.Types;
-
-/** Serializers for apache arrow. */
-public class ArrowSerializers {
-
- /** Use {@link ArrowTableSerializer} is more recommended. */
- public static class VectorSchemaRootSerializer
- extends CrossLanguageCompatibleSerializer<VectorSchemaRoot> {
- private static final BufferAllocator defaultAllocator =
- ArrowUtils.allocator.newChildAllocator(
- "arrow-vector-schema-root-reader", 64, Long.MAX_VALUE);
- private final BufferAllocator allocator;
-
- public VectorSchemaRootSerializer(Fory fory) {
- this(fory, defaultAllocator);
- }
-
- public VectorSchemaRootSerializer(Fory fory, BufferAllocator allocator) {
- super(fory, VectorSchemaRoot.class);
- this.allocator = allocator;
- }
-
- @Override
- public void write(MemoryBuffer buffer, VectorSchemaRoot root) {
- fory.writeBufferObject(buffer, new VectorSchemaRootBufferObject(root));
- }
-
- @Override
- public VectorSchemaRoot read(MemoryBuffer buffer) {
- MemoryBuffer buf = fory.readBufferObject(buffer);
- try {
- ReadableByteChannel channel = new MemoryBufferReadableChannel(buf);
- ArrowStreamReader reader = new ArrowStreamReader(channel, allocator);
- // FIXME close reader will close `root`.
- // since there is no possibility for resource leak, we can skip
`reader.close`
- // and let the user to close `root`.
- VectorSchemaRoot root = reader.getVectorSchemaRoot();
- // Only single record batch are supported for now.
- // Since call loadNextBatch again will clear previous loaded data, so
that we can't
- // check `reader.loadNextBatch()` again to check whether there is any
batch left.
- reader.loadNextBatch();
- return root;
- } catch (Exception e) {
- throw new RuntimeException("Unable to read a record batch message", e);
- }
- }
- }
-
- private static class VectorSchemaRootBufferObject implements BufferObject {
- private final int totalBytes;
- private final VectorSchemaRoot root;
-
- VectorSchemaRootBufferObject(VectorSchemaRoot root) {
- this.root = root;
- MockWritableChannel mockWritableChannel = new MockWritableChannel();
- write(root, mockWritableChannel);
- totalBytes = mockWritableChannel.totalBytes();
- }
-
- @Override
- public int totalBytes() {
- return totalBytes;
- }
-
- @Override
- public void writeTo(MemoryBuffer buffer) {
- write(root, new MemoryBufferWritableChannel(buffer));
- }
-
- private static void write(VectorSchemaRoot root, WritableByteChannel
byteChannel) {
- try (ArrowStreamWriter writer = new ArrowStreamWriter(root, null,
byteChannel)) {
- writer.writeBatch();
- } catch (IOException e) {
- Platform.throwException(e);
- }
- }
-
- @Override
- public MemoryBuffer toBuffer() {
- MemoryBuffer buffer = MemoryUtils.buffer(totalBytes);
- write(root, new MemoryBufferWritableChannel(buffer));
- return buffer.slice(0, buffer.writerIndex());
- }
- }
-
- public static class ArrowTableBufferObject implements BufferObject {
- private final ArrowTable table;
- private final int totalBytes;
-
- public ArrowTableBufferObject(ArrowTable table) {
- this.table = table;
- MockWritableChannel mockWritableChannel = new MockWritableChannel();
- write(table, mockWritableChannel);
- totalBytes = mockWritableChannel.totalBytes();
- }
-
- @Override
- public int totalBytes() {
- return totalBytes;
- }
-
- @Override
- public void writeTo(MemoryBuffer buffer) {
- write(table, new MemoryBufferWritableChannel(buffer));
- }
-
- private static void write(ArrowTable table, WritableByteChannel
byteChannel) {
- try (WriteChannel channel = new WriteChannel(byteChannel)) {
- MessageSerializer.serialize(channel, table.getSchema());
- for (ArrowRecordBatch recordBatch : table.getRecordBatches()) {
- MessageSerializer.serialize(channel, recordBatch);
- }
- ArrowStreamWriter.writeEndOfStream(channel, new IpcOption());
- } catch (IOException e) {
- Platform.throwException(e);
- }
- }
-
- @Override
- public MemoryBuffer toBuffer() {
- MemoryBuffer buffer = MemoryUtils.buffer(totalBytes);
- write(table, new MemoryBufferWritableChannel(buffer));
- return buffer.slice(0, buffer.writerIndex());
- }
- }
-
- public static void registerSerializers(Fory fory) {
- if (fory.isCrossLanguage()) {
- XtypeResolver resolver = fory.getXtypeResolver();
- resolver.registerForyType(
- ArrowTable.class, new ArrowTableSerializer(fory), Types.ARROW_TABLE);
- resolver.registerForyType(
- VectorSchemaRoot.class, new VectorSchemaRootSerializer(fory),
Types.ARROW_RECORD_BATCH);
- } else {
- fory.registerSerializer(ArrowTable.class, new
ArrowTableSerializer(fory));
- fory.registerSerializer(VectorSchemaRoot.class, new
VectorSchemaRootSerializer(fory));
- }
- }
-}
diff --git
a/java/fory-format/src/main/java/org/apache/fory/format/vectorized/ArrowTable.java
b/java/fory-format/src/main/java/org/apache/fory/format/vectorized/ArrowTable.java
deleted file mode 100644
index a8b530327..000000000
---
a/java/fory-format/src/main/java/org/apache/fory/format/vectorized/ArrowTable.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.fory.format.vectorized;
-
-import java.util.Iterator;
-import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.vector.VectorLoader;
-import org.apache.arrow.vector.VectorSchemaRoot;
-import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
-import org.apache.arrow.vector.types.pojo.Schema;
-import org.apache.fory.util.Preconditions;
-
-/** A custom pyarrow-style arrow table by attach {@link Schema} to {@link
ArrowRecordBatch}. */
-public class ArrowTable {
- private static final BufferAllocator tableBufferAllocator =
- ArrowUtils.allocator.newChildAllocator("table_buffer_allocator", 64,
Long.MAX_VALUE);
- private final Schema schema;
- private final BufferAllocator allocator;
- private Iterable<ArrowRecordBatch> recordBatches;
- private Iterator<ArrowRecordBatch> batchIterator;
- private VectorSchemaRoot root;
-
- public ArrowTable(Schema schema, Iterable<ArrowRecordBatch> recordBatches) {
- this(schema, recordBatches, tableBufferAllocator);
- }
-
- public ArrowTable(
- Schema schema, Iterable<ArrowRecordBatch> recordBatches, BufferAllocator
allocator) {
- this.schema = schema;
- this.recordBatches = recordBatches;
- this.allocator = allocator;
- }
-
- public Schema getSchema() {
- return schema;
- }
-
- public Iterable<ArrowRecordBatch> getRecordBatches() {
- return recordBatches;
- }
-
- public VectorSchemaRoot toVectorSchemaRoot() {
- return toVectorSchemaRoot(false);
- }
-
- public VectorSchemaRoot toVectorSchemaRoot(boolean reload) {
- if (!reload) {
- Preconditions.checkArgument(batchIterator == null);
- }
- batchIterator = recordBatches.iterator();
- root = VectorSchemaRoot.create(schema, allocator);
- return root;
- }
-
- public boolean loadNextBatch() {
- VectorLoader loader = new VectorLoader(root);
- if (batchIterator.hasNext()) {
- loader.load(batchIterator.next());
- return true;
- } else {
- return false;
- }
- }
-}
diff --git
a/java/fory-format/src/main/java/org/apache/fory/format/vectorized/ArrowTableSerializer.java
b/java/fory-format/src/main/java/org/apache/fory/format/vectorized/ArrowTableSerializer.java
deleted file mode 100644
index 1875f38ad..000000000
---
a/java/fory-format/src/main/java/org/apache/fory/format/vectorized/ArrowTableSerializer.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.fory.format.vectorized;
-
-import java.nio.channels.ReadableByteChannel;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.vector.VectorSchemaRoot;
-import org.apache.arrow.vector.VectorUnloader;
-import org.apache.arrow.vector.ipc.ArrowStreamReader;
-import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
-import org.apache.fory.Fory;
-import org.apache.fory.io.MemoryBufferReadableChannel;
-import org.apache.fory.memory.MemoryBuffer;
-import org.apache.fory.memory.Platform;
-import org.apache.fory.serializer.Serializers;
-
-/** Serializers for {@link ArrowTable}. */
-public class ArrowTableSerializer
- extends Serializers.CrossLanguageCompatibleSerializer<ArrowTable> {
- private static final BufferAllocator defaultAllocator =
- ArrowUtils.allocator.newChildAllocator("arrow-table-reader", 64,
Long.MAX_VALUE);
- private final BufferAllocator allocator;
-
- public ArrowTableSerializer(Fory fory) {
- this(fory, defaultAllocator);
- }
-
- public ArrowTableSerializer(Fory fory, BufferAllocator allocator) {
- super(fory, ArrowTable.class);
- this.allocator = allocator;
- }
-
- @Override
- public void write(MemoryBuffer buffer, ArrowTable value) {
- fory.writeBufferObject(buffer, new
ArrowSerializers.ArrowTableBufferObject(value));
- }
-
- @Override
- public ArrowTable read(MemoryBuffer buffer) {
- MemoryBuffer buf = fory.readBufferObject(buffer);
- List<ArrowRecordBatch> recordBatches = new ArrayList<>();
- try {
- ReadableByteChannel channel = new MemoryBufferReadableChannel(buf);
- ArrowStreamReader reader = new ArrowStreamReader(channel, allocator);
- VectorSchemaRoot root = reader.getVectorSchemaRoot();
- while (reader.loadNextBatch()) {
- recordBatches.add(new VectorUnloader(root).getRecordBatch());
- }
- return new ArrowTable(root.getSchema(), recordBatches, allocator);
- } catch (Exception e) {
- Platform.throwException(e);
- throw new RuntimeException("unreachable");
- }
- }
-}
diff --git
a/java/fory-format/src/test/java/org/apache/fory/format/CrossLanguageTest.java
b/java/fory-format/src/test/java/org/apache/fory/format/CrossLanguageTest.java
index 40f5f6c6f..f6d12c288 100644
---
a/java/fory-format/src/test/java/org/apache/fory/format/CrossLanguageTest.java
+++
b/java/fory-format/src/test/java/org/apache/fory/format/CrossLanguageTest.java
@@ -19,56 +19,26 @@
package org.apache.fory.format;
-import static
org.apache.fory.format.vectorized.ArrowSerializersTest.assertRecordBatchEqual;
-import static
org.apache.fory.format.vectorized.ArrowSerializersTest.assertTableEqual;
-import static
org.apache.fory.format.vectorized.ArrowUtilsTest.createVectorSchemaRoot;
-
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
-import java.nio.channels.Channels;
import java.nio.file.Files;
import java.nio.file.Path;
-import java.nio.file.StandardOpenOption;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
import lombok.Data;
-import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.memory.RootAllocator;
-import org.apache.arrow.vector.TinyIntVector;
-import org.apache.arrow.vector.VectorSchemaRoot;
-import org.apache.arrow.vector.VectorUnloader;
-import org.apache.arrow.vector.ipc.ArrowStreamWriter;
-import org.apache.arrow.vector.ipc.WriteChannel;
-import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
-import org.apache.arrow.vector.ipc.message.IpcOption;
-import org.apache.arrow.vector.types.Types;
-import org.apache.arrow.vector.types.pojo.FieldType;
-import org.apache.arrow.vector.types.pojo.Schema;
-import org.apache.fory.Fory;
-import org.apache.fory.config.Language;
import org.apache.fory.format.encoder.Encoders;
import org.apache.fory.format.encoder.RowEncoder;
import org.apache.fory.format.row.binary.BinaryRow;
import org.apache.fory.format.type.DataTypes;
-import org.apache.fory.format.vectorized.ArrowSerializers;
-import org.apache.fory.format.vectorized.ArrowTable;
-import org.apache.fory.format.vectorized.ArrowUtils;
-import org.apache.fory.format.vectorized.ArrowWriter;
-import org.apache.fory.io.MemoryBufferOutputStream;
import org.apache.fory.logging.Logger;
import org.apache.fory.logging.LoggerFactory;
import org.apache.fory.memory.MemoryBuffer;
import org.apache.fory.memory.MemoryUtils;
-import org.apache.fory.serializer.BufferObject;
import org.apache.fory.test.TestUtils;
import org.testng.Assert;
-import org.testng.SkipException;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -81,7 +51,7 @@ public class CrossLanguageTest {
@BeforeClass
public void isPythonInstalled() {
- throw new SkipException("disable cross language test temporarily");
+ TestUtils.verifyPyforyInstalled();
}
@Data
@@ -174,131 +144,6 @@ public class CrossLanguageTest {
Assert.assertEquals(foo, encoder.fromRow(newRow));
}
- public void testRecordBatchBasic() throws IOException {
- BufferAllocator alloc = new RootAllocator(Long.MAX_VALUE);
- org.apache.arrow.vector.types.pojo.Field arrowField =
- org.apache.arrow.vector.types.pojo.Field.nullable(
- "testField", Types.MinorType.TINYINT.getType());
- TinyIntVector vector =
- new TinyIntVector(
- "testField",
FieldType.nullable(Types.MinorType.TINYINT.getType()), alloc);
- VectorSchemaRoot root =
- new VectorSchemaRoot(
- Collections.singletonList(arrowField),
Collections.singletonList(vector));
- Path dataFile = Files.createTempFile("foo", "data");
- MemoryBuffer buffer = MemoryUtils.buffer(128);
- try (ArrowStreamWriter writer =
- new ArrowStreamWriter(root, null, new
MemoryBufferOutputStream(buffer))) {
- writer.start();
- for (int i = 0; i < 1; i++) {
- vector.allocateNew(16);
- for (int j = 0; j < 8; j++) {
- vector.set(j, j + i);
- vector.set(j + 8, 0, (byte) (j + i));
- }
- vector.setValueCount(16);
- root.setRowCount(16);
- writer.writeBatch();
- }
- writer.end();
- }
- Files.write(dataFile, buffer.getBytes(0, buffer.writerIndex()));
- ImmutableList<String> command =
- ImmutableList.of(
- PYTHON_EXECUTABLE,
- "-m",
- PYTHON_MODULE,
- "test_record_batch_basic",
- dataFile.toAbsolutePath().toString());
- Assert.assertTrue(executeCommand(command, 30));
- }
-
- public void testRecordBatchWriter() throws IOException {
- Foo foo = Foo.create();
- RowEncoder<Foo> encoder = Encoders.bean(Foo.class);
- Path dataFile = Files.createTempFile("foo", "data");
- MemoryBuffer buffer = MemoryUtils.buffer(128);
- ImmutableList<String> command =
- ImmutableList.of(
- PYTHON_EXECUTABLE,
- "-m",
- PYTHON_MODULE,
- "test_record_batch",
- dataFile.toAbsolutePath().toString());
- int numRows = 128;
- {
- VectorSchemaRoot root =
ArrowUtils.createVectorSchemaRoot(encoder.schema());
- ArrowWriter arrowWriter = new ArrowWriter(root);
- try (ArrowStreamWriter writer =
- new ArrowStreamWriter(root, null, new
MemoryBufferOutputStream(buffer))) {
- writer.start();
- for (int i = 0; i < numRows; i++) {
- BinaryRow row = encoder.toRow(foo);
- arrowWriter.write(row);
- }
- arrowWriter.finish();
- writer.writeBatch();
- writer.end();
- }
- Files.write(dataFile, buffer.getBytes(0, buffer.writerIndex()));
- Assert.assertTrue(executeCommand(command, 30));
- }
- {
- buffer.writerIndex(0);
- ArrowWriter arrowWriter = ArrowUtils.createArrowWriter(encoder.schema());
- for (int i = 0; i < numRows; i++) {
- BinaryRow row = encoder.toRow(foo);
- arrowWriter.write(row);
- }
- ArrowRecordBatch recordBatch = arrowWriter.finishAsRecordBatch();
- DataTypes.serializeSchema(encoder.schema(), buffer);
- ArrowUtils.serializeRecordBatch(recordBatch, buffer);
- arrowWriter.reset();
- ArrowStreamWriter.writeEndOfStream(
- new WriteChannel(Channels.newChannel(new
MemoryBufferOutputStream(buffer))),
- new IpcOption());
- Files.write(dataFile, buffer.getBytes(0, buffer.writerIndex()));
- Assert.assertTrue(executeCommand(command, 30));
- }
- }
-
- public void testWriteMultiRecordBatch() throws IOException {
- Foo foo = Foo.create();
- RowEncoder<Foo> encoder = Encoders.bean(Foo.class);
- Path schemaFile = Files.createTempFile("foo", "schema");
- Path dataFile = Files.createTempFile("foo", "data");
- ImmutableList<String> command =
- ImmutableList.of(
- PYTHON_EXECUTABLE,
- "-m",
- PYTHON_MODULE,
- "test_write_multi_record_batch",
- schemaFile.toAbsolutePath().toString(),
- dataFile.toAbsolutePath().toString());
- {
- MemoryBuffer buffer = MemoryUtils.buffer(128);
- buffer.writerIndex(0);
- DataTypes.serializeSchema(encoder.schema(), buffer);
- Files.write(schemaFile, buffer.getBytes(0, buffer.writerIndex()));
- }
- ArrowWriter arrowWriter = ArrowUtils.createArrowWriter(encoder.schema());
- int numBatches = 5;
- for (int i = 0; i < numBatches; i++) {
- int numRows = 128;
- for (int j = 0; j < numRows; j++) {
- BinaryRow row = encoder.toRow(foo);
- arrowWriter.write(row);
- }
- ArrowRecordBatch recordBatch = arrowWriter.finishAsRecordBatch();
- MemoryBuffer buffer = MemoryUtils.buffer(128);
- ArrowUtils.serializeRecordBatch(recordBatch, buffer);
- arrowWriter.reset();
- Files.write(
- dataFile, buffer.getBytes(0, buffer.writerIndex()),
StandardOpenOption.TRUNCATE_EXISTING);
- Assert.assertTrue(executeCommand(command, 30));
- }
- }
-
/** Keep this in sync with `foo_schema` in test_cross_language.py */
@Data
public static class Foo {
@@ -352,111 +197,4 @@ public class CrossLanguageTest {
return TestUtils.executeCommand(
command, waitTimeoutSeconds,
ImmutableMap.of("ENABLE_CROSS_LANGUAGE_TESTS", "true"));
}
-
- @Test
- public void testSerializeArrowInBand() throws Exception {
- Fory fory =
- Fory.builder()
- .withLanguage(Language.XLANG)
- .withRefTracking(true)
- .requireClassRegistration(false)
- .build();
- ArrowSerializers.registerSerializers(fory);
- MemoryBuffer buffer = MemoryUtils.buffer(32);
- int size = 2000;
- VectorSchemaRoot root = createVectorSchemaRoot(size);
- fory.serialize(buffer, root);
- Schema schema = root.getSchema();
- List<ArrowRecordBatch> recordBatches = new ArrayList<>();
- for (int i = 0; i < 2; i++) {
- VectorUnloader unloader = new VectorUnloader(root);
- recordBatches.add(unloader.getRecordBatch());
- }
- ArrowTable table = new ArrowTable(schema, recordBatches);
- fory.serialize(buffer, table);
- assertRecordBatchEqual((VectorSchemaRoot) fory.deserialize(buffer), root);
- assertTableEqual((ArrowTable) fory.deserialize(buffer), table);
-
- Path dataFile = Files.createTempFile("test_serialize_arrow_in_band",
"data");
- Files.write(dataFile, buffer.getBytes(0, buffer.writerIndex()));
- ImmutableList<String> command =
- ImmutableList.of(
- PYTHON_EXECUTABLE,
- "-m",
- PYTHON_MODULE,
- "test_serialize_arrow_in_band",
- dataFile.toAbsolutePath().toString());
- Assert.assertTrue(executeCommand(command, 30));
-
- MemoryBuffer buffer2 = MemoryUtils.wrap(Files.readAllBytes(dataFile));
- assertRecordBatchEqual((VectorSchemaRoot) fory.deserialize(buffer2), root);
- assertTableEqual((ArrowTable) fory.deserialize(buffer2), table);
- }
-
- @Test
- public void testSerializeArrowOutOfBand() throws Exception {
- List<BufferObject> bufferObjects = new ArrayList<>();
- Fory fory =
- Fory.builder()
- .withLanguage(Language.XLANG)
- .withRefTracking(true)
- .requireClassRegistration(false)
- .build();
- ArrowSerializers.registerSerializers(fory);
-
- MemoryBuffer buffer = MemoryUtils.buffer(32);
- int size = 2000;
- VectorSchemaRoot root = createVectorSchemaRoot(size);
- Schema schema = root.getSchema();
- List<ArrowRecordBatch> recordBatches = new ArrayList<>();
- for (int i = 0; i < 2; i++) {
- VectorUnloader unloader = new VectorUnloader(root);
- recordBatches.add(unloader.getRecordBatch());
- }
- ArrowTable table = new ArrowTable(schema, recordBatches);
- fory.serialize(
- buffer,
- Arrays.asList(root, table),
- e -> {
- bufferObjects.add(e);
- return false;
- });
- List<MemoryBuffer> buffers =
-
bufferObjects.stream().map(BufferObject::toBuffer).collect(Collectors.toList());
- List<?> objects = (List<?>) fory.deserialize(buffer, buffers);
- Assert.assertNotNull(objects);
- assertRecordBatchEqual((VectorSchemaRoot) objects.get(0), root);
- assertTableEqual((ArrowTable) objects.get(1), table);
-
- Path intBandDataFile =
- Files.createTempFile("test_serialize_arrow_out_of_band_",
"in_band.data");
- Files.write(intBandDataFile, buffer.getBytes(0, buffer.writerIndex()));
- Path outOfBandDataFile =
- Files.createTempFile("test_serialize_arrow_out_of_band",
"out_of_band.data");
- MemoryBuffer outOfBandBuffer = MemoryUtils.buffer(32);
- outOfBandBuffer.writeInt32(bufferObjects.get(0).totalBytes());
- outOfBandBuffer.writeInt32(bufferObjects.get(1).totalBytes());
- bufferObjects.get(0).writeTo(outOfBandBuffer);
- bufferObjects.get(1).writeTo(outOfBandBuffer);
- Files.write(outOfBandDataFile, outOfBandBuffer.getBytes(0,
outOfBandBuffer.writerIndex()));
- ImmutableList<String> command =
- ImmutableList.of(
- PYTHON_EXECUTABLE,
- "-m",
- PYTHON_MODULE,
- "test_serialize_arrow_out_of_band",
- intBandDataFile.toAbsolutePath().toString(),
- outOfBandDataFile.toAbsolutePath().toString());
- Assert.assertTrue(executeCommand(command, 30));
-
- MemoryBuffer intBandBuffer =
MemoryUtils.wrap(Files.readAllBytes(intBandDataFile));
- outOfBandBuffer = MemoryUtils.wrap(Files.readAllBytes(outOfBandDataFile));
- int len1 = outOfBandBuffer.readInt32();
- int len2 = outOfBandBuffer.readInt32();
- buffers = Arrays.asList(outOfBandBuffer.slice(8, len1),
outOfBandBuffer.slice(8 + len1, len2));
- objects = (List<?>) fory.deserialize(intBandBuffer, buffers);
- Assert.assertNotNull(objects);
- assertRecordBatchEqual((VectorSchemaRoot) objects.get(0), root);
- assertTableEqual((ArrowTable) objects.get(1), table);
- }
}
diff --git
a/java/fory-format/src/test/java/org/apache/fory/format/type/SchemaEncoderTest.java
b/java/fory-format/src/test/java/org/apache/fory/format/type/SchemaEncoderTest.java
new file mode 100644
index 000000000..e98f3e696
--- /dev/null
+++
b/java/fory-format/src/test/java/org/apache/fory/format/type/SchemaEncoderTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.fory.format.type;
+
+import static org.testng.Assert.assertEquals;
+
+import java.util.Arrays;
+import org.testng.annotations.Test;
+
+public class SchemaEncoderTest {
+
+ @Test
+ public void testSerializeSimpleSchema() {
+ Schema schema =
+ new Schema(
+ Arrays.asList(
+ new Field("id", DataTypes.int32(), true),
+ new Field("name", DataTypes.utf8(), true),
+ new Field("score", DataTypes.float64(), true),
+ new Field("active", DataTypes.bool(), true)));
+
+ byte[] bytes = SchemaEncoder.toBytes(schema);
+ Schema deserialized = SchemaEncoder.fromBytes(bytes);
+
+ assertEquals(deserialized, schema);
+ assertEquals(deserialized.numFields(), 4);
+ assertEquals(deserialized.field(0).name(), "id");
+ assertEquals(deserialized.field(1).name(), "name");
+ assertEquals(deserialized.field(2).name(), "score");
+ assertEquals(deserialized.field(3).name(), "active");
+ }
+
+ @Test
+ public void testSerializeNestedSchema() {
+ DataType addressType =
+ DataTypes.struct(
+ new Field("street", DataTypes.utf8(), true),
+ new Field("city", DataTypes.utf8(), true),
+ new Field("zip", DataTypes.int32(), true));
+
+ Schema schema =
+ new Schema(
+ Arrays.asList(
+ new Field("id", DataTypes.int32(), true),
+ new Field("name", DataTypes.utf8(), true),
+ new Field("address", addressType, true)));
+
+ byte[] bytes = SchemaEncoder.toBytes(schema);
+ Schema deserialized = SchemaEncoder.fromBytes(bytes);
+
+ assertEquals(deserialized, schema);
+ assertEquals(deserialized.numFields(), 3);
+ assertEquals(deserialized.field(2).name(), "address");
+ assertEquals(deserialized.field(2).type().typeId(), DataTypes.TYPE_STRUCT);
+ }
+
+ @Test
+ public void testSerializeListType() {
+ Schema schema =
+ new Schema(
+ Arrays.asList(
+ new Field("id", DataTypes.int32(), true),
+ new Field("tags", DataTypes.list(DataTypes.utf8()), true)));
+
+ byte[] bytes = SchemaEncoder.toBytes(schema);
+ Schema deserialized = SchemaEncoder.fromBytes(bytes);
+
+ assertEquals(deserialized, schema);
+ }
+
+ @Test
+ public void testSerializeMapType() {
+ Schema schema =
+ new Schema(
+ Arrays.asList(
+ new Field("id", DataTypes.int32(), true),
+ new Field("metadata", DataTypes.map(DataTypes.utf8(),
DataTypes.int32()), true)));
+
+ byte[] bytes = SchemaEncoder.toBytes(schema);
+ Schema deserialized = SchemaEncoder.fromBytes(bytes);
+
+ assertEquals(deserialized, schema);
+ }
+
+ @Test
+ public void testSerializeDecimalType() {
+ Schema schema =
+ new Schema(
+ Arrays.asList(
+ new Field("id", DataTypes.int32(), true),
+ new Field("amount", DataTypes.decimal(18, 2), true)));
+
+ byte[] bytes = SchemaEncoder.toBytes(schema);
+ Schema deserialized = SchemaEncoder.fromBytes(bytes);
+
+ assertEquals(deserialized, schema);
+ DataTypes.DecimalType decimalType = (DataTypes.DecimalType)
deserialized.field(1).type();
+ assertEquals(decimalType.precision(), 18);
+ assertEquals(decimalType.scale(), 2);
+ }
+
+ @Test
+ public void testSerializeWithNullability() {
+ Schema schema =
+ new Schema(
+ Arrays.asList(
+ new Field("id", DataTypes.int32(), true),
+ new Field("name", DataTypes.utf8(), true),
+ new Field("score", DataTypes.float64(), false)));
+
+ byte[] bytes = SchemaEncoder.toBytes(schema);
+ Schema deserialized = SchemaEncoder.fromBytes(bytes);
+
+ assertEquals(deserialized, schema);
+ assertEquals(deserialized.field(0).nullable(), true);
+ assertEquals(deserialized.field(1).nullable(), true);
+ assertEquals(deserialized.field(2).nullable(), false);
+ }
+
+ @Test
+ public void testRoundTripWithDataTypes() {
+ Schema schema =
+ new Schema(
+ Arrays.asList(
+ new Field("f1", DataTypes.bool(), true),
+ new Field("f2", DataTypes.int8(), true),
+ new Field("f3", DataTypes.int16(), true),
+ new Field("f4", DataTypes.int32(), true),
+ new Field("f5", DataTypes.int64(), true),
+ new Field("f6", DataTypes.float32(), true),
+ new Field("f7", DataTypes.float64(), true),
+ new Field("f8", DataTypes.utf8(), true),
+ new Field("f9", DataTypes.binary(), true),
+ new Field("f10", DataTypes.date32(), true),
+ new Field("f11", DataTypes.timestamp(), true),
+ new Field("f12", DataTypes.duration(), true)));
+
+ byte[] bytes = DataTypes.serializeSchema(schema);
+ Schema deserialized = DataTypes.deserializeSchema(bytes);
+
+ assertEquals(deserialized, schema);
+ }
+}
diff --git
a/java/fory-format/src/test/java/org/apache/fory/format/vectorized/ArrowSerializersTest.java
b/java/fory-format/src/test/java/org/apache/fory/format/vectorized/ArrowSerializersTest.java
deleted file mode 100644
index 5300ef1c8..000000000
---
a/java/fory-format/src/test/java/org/apache/fory/format/vectorized/ArrowSerializersTest.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.fory.format.vectorized;
-
-import static
org.apache.fory.format.vectorized.ArrowUtilsTest.createVectorSchemaRoot;
-import static org.testng.Assert.assertEquals;
-
-import java.io.IOException;
-import java.nio.channels.ReadableByteChannel;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.stream.Collectors;
-import org.apache.arrow.vector.VectorSchemaRoot;
-import org.apache.arrow.vector.VectorUnloader;
-import org.apache.arrow.vector.ipc.ArrowStreamReader;
-import org.apache.arrow.vector.ipc.ArrowStreamWriter;
-import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
-import org.apache.arrow.vector.types.pojo.Schema;
-import org.apache.fory.Fory;
-import org.apache.fory.config.Language;
-import org.apache.fory.io.MemoryBufferReadableChannel;
-import org.apache.fory.io.MemoryBufferWritableChannel;
-import org.apache.fory.memory.MemoryBuffer;
-import org.apache.fory.memory.MemoryUtils;
-import org.apache.fory.resolver.ClassResolver;
-import org.apache.fory.serializer.BufferObject;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-public class ArrowSerializersTest {
- @Test
- public void testRegisterArrowSerializer() throws Exception {
- Fory fory = Fory.builder().withLanguage(Language.JAVA).build();
- ClassResolver classResolver = fory.getClassResolver();
- ArrowSerializers.registerSerializers(fory);
- assertEquals(classResolver.getSerializerClass(ArrowTable.class),
ArrowTableSerializer.class);
- assertEquals(
- classResolver.getSerializerClass(VectorSchemaRoot.class),
- ArrowSerializers.VectorSchemaRootSerializer.class);
- }
-
- @Test
- public void testArrowTableBufferObject() throws IOException {
- int size = 200;
- VectorSchemaRoot root = createVectorSchemaRoot(size);
- Schema schema = root.getSchema();
- List<ArrowRecordBatch> recordBatches = new ArrayList<>();
- for (int i = 0; i < 2; i++) {
- VectorUnloader unloader = new VectorUnloader(root);
- recordBatches.add(unloader.getRecordBatch());
- }
- ArrowTable table = new ArrowTable(schema, recordBatches);
- ArrowSerializers.ArrowTableBufferObject o = new
ArrowSerializers.ArrowTableBufferObject(table);
- MemoryBuffer buf = o.toBuffer();
- ReadableByteChannel channel = new MemoryBufferReadableChannel(buf);
- ArrowStreamReader reader = new ArrowStreamReader(channel,
ArrowUtils.allocator);
- VectorSchemaRoot newRoot = reader.getVectorSchemaRoot();
- while (reader.loadNextBatch()) {
- recordBatches.add(new VectorUnloader(root).getRecordBatch());
- }
- ArrowTable newTable = new ArrowTable(root.getSchema(), recordBatches,
ArrowUtils.allocator);
- assertTableEqual(newTable, table);
- }
-
- @Test
- public void testWriteVectorSchemaRoot() throws IOException {
- Collection<BufferObject> bufferObjects = new ArrayList<>();
- Fory fory = Fory.builder().requireClassRegistration(false).build();
- ArrowSerializers.registerSerializers(fory);
- int size = 2000;
- VectorSchemaRoot root = ArrowUtilsTest.createVectorSchemaRoot(size);
- Assert.assertEquals(
-
fory.getClassResolver().getSerializer(VectorSchemaRoot.class).getClass(),
- ArrowSerializers.VectorSchemaRootSerializer.class);
- byte[] serializedBytes = fory.serialize(root, e -> !bufferObjects.add(e));
- Assert.assertFalse(bufferObjects.isEmpty());
- List<MemoryBuffer> buffers =
-
bufferObjects.stream().map(BufferObject::toBuffer).collect(Collectors.toList());
- MemoryBuffer buffer = buffers.get(0);
-
- MemoryBuffer buffer2 = MemoryUtils.buffer(32);
- try (ArrowStreamWriter writer =
- new ArrowStreamWriter(root, null, new
MemoryBufferWritableChannel(buffer2))) {
- // this will make root empty.
- writer.writeBatch();
- }
- Assert.assertEquals(buffer.size(), buffer2.writerIndex());
- Assert.assertTrue(buffer.equalTo(buffer2, 0, 0, buffer.size()));
-
- VectorSchemaRoot newRoot = (VectorSchemaRoot)
fory.deserialize(serializedBytes, buffers);
- assertRecordBatchEqual(newRoot, root);
-
- // test in band serialization.
- fory = Fory.builder().requireClassRegistration(false).build();
- ArrowSerializers.registerSerializers(fory);
- newRoot = (VectorSchemaRoot) fory.deserialize(fory.serialize(root));
- assertRecordBatchEqual(newRoot, root);
- }
-
- @Test
- public void testWriteArrowTable() throws IOException {
- Collection<BufferObject> bufferObjects = new ArrayList<>();
- Fory fory = Fory.builder().requireClassRegistration(false).build();
- ArrowSerializers.registerSerializers(fory);
- int size = 2000;
- VectorSchemaRoot root = ArrowUtilsTest.createVectorSchemaRoot(size);
- Schema schema = root.getSchema();
- List<ArrowRecordBatch> recordBatches = new ArrayList<>();
- for (int i = 0; i < 2; i++) {
- VectorUnloader unloader = new VectorUnloader(root);
- recordBatches.add(unloader.getRecordBatch());
- }
- ArrowTable table = new ArrowTable(schema, recordBatches);
- Assert.assertEquals(
- fory.getClassResolver().getSerializer(ArrowTable.class).getClass(),
- ArrowTableSerializer.class);
- byte[] serializedData = fory.serialize(table, e -> !bufferObjects.add(e));
- List<MemoryBuffer> buffers =
-
bufferObjects.stream().map(BufferObject::toBuffer).collect(Collectors.toList());
- Assert.assertEquals(bufferObjects.size(), 1);
-
- ArrowTable newTable = (ArrowTable) fory.deserialize(serializedData,
buffers);
- assertTableEqual(newTable, table);
-
- // test in band serialization.
- fory = Fory.builder().requireClassRegistration(false).build();
- ArrowSerializers.registerSerializers(fory);
- newTable = (ArrowTable) fory.deserialize(fory.serialize(table));
- assertTableEqual(newTable, table);
- }
-
- public static void assertRecordBatchEqual(VectorSchemaRoot root1,
VectorSchemaRoot root2) {
- Assert.assertEquals(root1.getSchema(), root2.getSchema());
- Assert.assertEquals(root1.getRowCount(), root2.getRowCount());
- for (int i = 0; i < root2.getFieldVectors().size(); i++) {
- for (int j = 0; j < root2.getRowCount(); j += root2.getRowCount() / 100)
{
- Assert.assertEquals(
- root1.getFieldVectors().get(i).getObject(j),
- root2.getFieldVectors().get(i).getObject(j));
- }
- }
- }
-
- public static void assertTableEqual(ArrowTable t1, ArrowTable t2) {
- VectorSchemaRoot root1 = t1.toVectorSchemaRoot(true);
- VectorSchemaRoot root2 = t2.toVectorSchemaRoot(true);
- assertEquals(root1.getSchema(), t2.getSchema());
- while (t1.loadNextBatch()) {
- t2.loadNextBatch();
- Assert.assertEquals(root1.getRowCount(), root2.getRowCount());
- for (int i = 0; i < root2.getFieldVectors().size(); i++) {
- for (int j = 0; j < root2.getRowCount(); j += root2.getRowCount() /
100) {
- Assert.assertEquals(
- root1.getFieldVectors().get(i).getObject(j),
- root2.getFieldVectors().get(i).getObject(j));
- }
- }
- }
- }
-}
diff --git a/python/pyfory/_registry.py b/python/pyfory/_registry.py
index 3ecb90f61..7590cf238 100644
--- a/python/pyfory/_registry.py
+++ b/python/pyfory/_registry.py
@@ -301,21 +301,6 @@ class TypeResolver:
register(list, type_id=TypeId.LIST, serializer=ListSerializer)
register(set, type_id=TypeId.SET, serializer=SetSerializer)
register(dict, type_id=TypeId.MAP, serializer=MapSerializer)
- try:
- import pyarrow as pa
- from pyfory.format.serializer import (
- ArrowRecordBatchSerializer,
- ArrowTableSerializer,
- )
-
- register(
- pa.RecordBatch,
- type_id=TypeId.ARROW_RECORD_BATCH,
- serializer=ArrowRecordBatchSerializer,
- )
- register(pa.Table, type_id=TypeId.ARROW_TABLE,
serializer=ArrowTableSerializer)
- except Exception:
- pass
def register_type(
self,
diff --git a/python/pyfory/format/__init__.py b/python/pyfory/format/__init__.py
index 5b048fd76..6c9fb205d 100644
--- a/python/pyfory/format/__init__.py
+++ b/python/pyfory/format/__init__.py
@@ -60,6 +60,7 @@ try:
infer_data_type,
get_type_id,
compute_schema_hash,
+ from_arrow_schema,
to_arrow_schema,
)
from pyfory.format.encoder import ( # noqa: F401
diff --git a/python/pyfory/format/infer.py b/python/pyfory/format/infer.py
index 80051de49..f773f4c55 100644
--- a/python/pyfory/format/infer.py
+++ b/python/pyfory/format/infer.py
@@ -25,7 +25,11 @@ from pyfory.format._format import (
DataType,
TypeId,
boolean,
+ int8,
+ int16,
+ int32,
int64,
+ float32,
float64,
utf8,
binary,
@@ -98,6 +102,44 @@ _supported_types_mapping = {
datetime.datetime: timestamp,
}
+# Add pyfory type annotations support
+from pyfory.type import (
+ int8 as int8_type,
+ int16 as int16_type,
+ int32 as int32_type,
+ int64 as int64_type,
+ float32 as float32_type,
+ float64 as float64_type,
+)
+
+_supported_types_mapping.update(
+ {
+ int8_type: int8,
+ int16_type: int16,
+ int32_type: int32,
+ int64_type: int64,
+ float32_type: float32,
+ float64_type: float64,
+ }
+)
+
+# Add numpy types if available
+try:
+ import numpy as np
+
+ _supported_types_mapping.update(
+ {
+ np.int8: int8,
+ np.int16: int16,
+ np.int32: int32,
+ np.int64: int64,
+ np.float32: float32,
+ np.float64: float64,
+ }
+ )
+except ImportError:
+ pass
+
def infer_schema(clz, types_path=None) -> Schema:
types_path = list(types_path or [])
@@ -199,6 +241,76 @@ def _compute_hash(hash_: int, type_: DataType):
return hash_
+def from_arrow_schema(arrow_schema) -> Schema:
+ """Convert an Arrow Schema to a Fory Schema.
+
+ This is for compatibility with code that uses PyArrow schemas.
+
+ Args:
+ arrow_schema: A PyArrow Schema object.
+
+ Returns:
+ A Fory Schema object with the same structure.
+
+ Raises:
+ ImportError: If pyarrow is not available.
+ """
+ try:
+ from pyarrow import types as pa_types
+ except ImportError:
+ raise ImportError("pyarrow is required for Arrow schema conversion")
+
+ def convert_type(arrow_type) -> DataType:
+ if pa_types.is_boolean(arrow_type):
+ return boolean()
+ elif pa_types.is_int8(arrow_type):
+ from pyfory.format._format import int8
+
+ return int8()
+ elif pa_types.is_int16(arrow_type):
+ from pyfory.format._format import int16
+
+ return int16()
+ elif pa_types.is_int32(arrow_type):
+ from pyfory.format._format import int32
+
+ return int32()
+ elif pa_types.is_int64(arrow_type):
+ return int64()
+ elif pa_types.is_float32(arrow_type):
+ from pyfory.format._format import float32
+
+ return float32()
+ elif pa_types.is_float64(arrow_type):
+ return float64()
+ elif pa_types.is_string(arrow_type) or
pa_types.is_large_string(arrow_type):
+ return utf8()
+ elif pa_types.is_binary(arrow_type) or
pa_types.is_large_binary(arrow_type):
+ return binary()
+ elif pa_types.is_date32(arrow_type):
+ return date32()
+ elif pa_types.is_timestamp(arrow_type):
+ return timestamp()
+ elif pa_types.is_list(arrow_type) or
pa_types.is_large_list(arrow_type):
+ return list_(convert_type(arrow_type.value_type))
+ elif pa_types.is_map(arrow_type):
+ return map_(convert_type(arrow_type.key_type),
convert_type(arrow_type.item_type))
+ elif pa_types.is_struct(arrow_type):
+ fields = []
+ for i in range(arrow_type.num_fields):
+ f = arrow_type.field(i)
+ fields.append(field(f.name, convert_type(f.type),
nullable=f.nullable))
+ return struct(fields)
+ else:
+ raise TypeError(f"Unsupported Arrow type for Fory conversion:
{arrow_type}")
+
+ fory_fields = []
+ for i in range(len(arrow_schema)):
+ f = arrow_schema.field(i)
+ fory_fields.append(field(f.name, convert_type(f.type),
nullable=f.nullable))
+ return schema(fory_fields)
+
+
def to_arrow_schema(fory_schema: Schema):
"""Convert a Fory Schema to an Arrow Schema.
diff --git a/python/pyfory/format/schema.pxi b/python/pyfory/format/schema.pxi
index 1c2651901..2a9f49e9d 100644
--- a/python/pyfory/format/schema.pxi
+++ b/python/pyfory/format/schema.pxi
@@ -347,6 +347,38 @@ cdef class Schema:
def __repr__(self) -> str:
return f"Schema({self})"
+ def to_bytes(self) -> bytes:
+ """Serialize this schema to bytes.
+
+ Returns:
+ bytes: The serialized schema.
+ """
+ cdef vector[uint8_t] c_bytes = self.c_schema.get().ToBytes()
+ return bytes(c_bytes)
+
+ @staticmethod
+ def from_bytes(data) -> Schema:
+ """Deserialize a schema from bytes.
+
+ Args:
+ data: bytes containing serialized schema.
+
+ Returns:
+ Schema: The deserialized schema.
+ """
+ cdef const uint8_t* data_ptr
+ cdef Py_ssize_t data_len
+ cdef vector[uint8_t] c_bytes
+ if isinstance(data, bytes):
+ data_ptr = <const uint8_t*>(<bytes>data)
+ data_len = len(<bytes>data)
+ else:
+ py_bytes = bytes(data)
+ data_ptr = <const uint8_t*>py_bytes
+ data_len = len(py_bytes)
+ c_bytes.assign(data_ptr, data_ptr + data_len)
+ return Schema.wrap(CSchema.FromBytes(c_bytes))
+
# Factory functions for creating types
diff --git a/python/pyfory/format/serializer.py
b/python/pyfory/format/serializer.py
deleted file mode 100644
index aaaf1519f..000000000
--- a/python/pyfory/format/serializer.py
+++ /dev/null
@@ -1,119 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import pyarrow as pa
-from pyfory.serializer import XlangCompatibleSerializer, BufferObject
-from pyfory.buffer import Buffer
-
-
-class ArrowRecordBatchSerializer(XlangCompatibleSerializer):
- def write(self, buffer, value: pa.RecordBatch):
- self.fory.write_buffer_object(buffer,
ArrowRecordBatchBufferObject(value))
-
- def read(self, buffer: Buffer) -> pa.Table:
- fory_buf = self.fory.read_buffer_object(buffer)
- # If the input source supports zero-copy reads (e.g. like a memory
- # map, or pa.BufferReader), then the returned batches are also
- # zero-copy and do not allocate any new memory on read.
- # So here the read is zero copy.
- reader = pa.ipc.open_stream(pa.py_buffer(fory_buf))
- [batch] = [batch for batch in reader]
- return batch
-
-
-class ArrowRecordBatchBufferObject(BufferObject):
- def __init__(self, batch: pa.RecordBatch):
- self.batch = batch
- mock_sink = pa.MockOutputStream()
- ArrowRecordBatchBufferObject._write(batch, mock_sink)
- self.nbytes = mock_sink.size()
-
- def total_bytes(self) -> int:
- return self.nbytes
-
- def write_to(self, stream):
- if isinstance(stream, Buffer):
- sink = pa.FixedSizeBufferWriter(pa.py_buffer(stream))
- else:
- sink = pa.BufferOutputStream()
- self._write(self.batch, sink)
- if not isinstance(stream, Buffer):
- data = sink.getvalue()
- if hasattr(stream, "write"):
- stream.write(data.to_pybytes())
-
- def getbuffer(self) -> memoryview:
- sink = pa.BufferOutputStream()
- ArrowRecordBatchBufferObject._write(self.batch, sink)
- arrow_buffer = sink.getvalue()
- return memoryview(arrow_buffer)
-
- @staticmethod
- def _write(batch, sink):
- stream_writer = pa.RecordBatchStreamWriter(sink, batch.schema)
- stream_writer.write_batch(batch)
- stream_writer.close()
-
-
-class ArrowTableSerializer(XlangCompatibleSerializer):
- def write(self, buffer, value: pa.Table):
- self.fory.write_buffer_object(buffer, ArrowTableBufferObject(value))
-
- def read(self, buffer: Buffer) -> pa.Table:
- fory_buf = self.fory.read_buffer_object(buffer)
- # If the input source supports zero-copy reads (e.g. like a memory
- # map, or pa.BufferReader), then the returned batches are also
- # zero-copy and do not allocate any new memory on read.
- # So here the read is zero copy.
- reader = pa.ipc.open_stream(pa.py_buffer(fory_buf))
- batches = [batch for batch in reader]
- return pa.Table.from_batches(batches)
-
-
-class ArrowTableBufferObject(BufferObject):
- def __init__(self, table: pa.Table):
- self.table = table
- mock_sink = pa.MockOutputStream()
- ArrowTableBufferObject._write(table, mock_sink)
- self.nbytes = mock_sink.size()
-
- def total_bytes(self) -> int:
- return self.nbytes
-
- def write_to(self, stream):
- if isinstance(stream, Buffer):
- sink = pa.FixedSizeBufferWriter(pa.py_buffer(stream))
- else:
- sink = pa.BufferOutputStream()
- ArrowTableBufferObject._write(self.table, sink)
- if not isinstance(stream, Buffer):
- data = sink.getvalue()
- if hasattr(stream, "write"):
- stream.write(data.to_pybytes())
-
- def getbuffer(self) -> memoryview:
- sink = pa.BufferOutputStream()
- self._write(self.table, sink)
- arrow_buffer = sink.getvalue()
- return memoryview(arrow_buffer)
-
- @staticmethod
- def _write(table, sink):
- stream_writer = pa.RecordBatchStreamWriter(sink, table.schema)
- for batch in table.to_batches():
- stream_writer.write_batch(batch)
- stream_writer.close()
diff --git a/python/pyfory/includes/libformat.pxd
b/python/pyfory/includes/libformat.pxd
index 15c8463c9..abb0e3930 100755
--- a/python/pyfory/includes/libformat.pxd
+++ b/python/pyfory/includes/libformat.pxd
@@ -77,8 +77,6 @@ cdef extern from "fory/type/type.h" namespace "fory" nogil:
FLOAT16_ARRAY = 35
FLOAT32_ARRAY = 36
FLOAT64_ARRAY = 37
- ARROW_RECORD_BATCH = 38
- ARROW_TABLE = 39
UNKNOWN = 64
BOUND = 64
@@ -192,6 +190,10 @@ cdef extern from "fory/row/schema.h" namespace "fory::row"
nogil:
c_string ToString()
c_bool Equals(const CSchema& other)
c_bool Equals(shared_ptr[CSchema] other)
+ # Schema serialization methods
+ vector[uint8_t] ToBytes() const
+ @staticmethod
+ shared_ptr[CSchema] FromBytes(const vector[uint8_t]& bytes)
ctypedef shared_ptr[CSchema] CSchemaPtr" fory::row::SchemaPtr"
diff --git a/python/pyfory/includes/libserialization.pxd
b/python/pyfory/includes/libserialization.pxd
index d25b37bf7..4f86119e9 100644
--- a/python/pyfory/includes/libserialization.pxd
+++ b/python/pyfory/includes/libserialization.pxd
@@ -60,8 +60,6 @@ cdef extern from "fory/type/type.h" namespace "fory" nogil:
FLOAT16_ARRAY = 35
FLOAT32_ARRAY = 36
FLOAT64_ARRAY = 37
- ARROW_RECORD_BATCH = 38
- ARROW_TABLE = 39
BOUND = 64
cdef c_bool IsNamespacedType(int32_t type_id)
diff --git a/python/pyfory/tests/test_cross_language.py
b/python/pyfory/tests/test_cross_language.py
index 66da57cb6..486e977a2 100644
--- a/python/pyfory/tests/test_cross_language.py
+++ b/python/pyfory/tests/test_cross_language.py
@@ -38,6 +38,18 @@ def debug_print(*params):
# print(*params)
+def to_dict(obj):
+ """Convert an object to a dictionary for comparison."""
+ if hasattr(obj, "as_dict"):
+ return obj.as_dict()
+ elif hasattr(obj, "__dataclass_fields__"):
+ from dataclasses import asdict
+
+ return asdict(obj)
+ else:
+ return obj
+
+
def cross_language_test(test_func):
env_key = "ENABLE_CROSS_LANGUAGE_TESTS"
test_func = pytest.mark.skipif(
@@ -52,45 +64,48 @@ Bar = pyfory.record_class_factory("Bar", ["f" + str(i) for
i in range(1, 3)])
def create_bar_schema():
- bar_schema = pa.schema(
+ from pyfory.format import field, int32, utf8, schema
+
+ bar_schema = schema(
[
- ("f1", pa.int32()),
- ("f2", pa.utf8()),
+ field("f1", int32()),
+ field("f2", utf8()),
]
)
return bar_schema
def create_foo_schema():
- foo_schema = pa.schema(
+ from pyfory.format import field, int32, utf8, list_, map_, struct, schema
+
+ bar_fields = [
+ field("f1", int32()),
+ field("f2", utf8()),
+ ]
+ foo_schema = schema(
[
- ("f1", pa.int32()),
- ("f2", pa.utf8()),
- ("f3", pa.list_(pa.utf8())),
- ("f4", pa.map_(pa.utf8(), pa.int32())),
- pa.field(
- "f5",
- pa.struct(create_bar_schema()),
- metadata={"cls": pyfory.get_qualified_classname(Bar)},
- ),
- ],
- metadata={"cls": pyfory.get_qualified_classname(Foo)},
+ field("f1", int32()),
+ field("f2", utf8()),
+ field("f3", list_(utf8())),
+ field("f4", map_(utf8(), int32())),
+ field("f5", struct(bar_fields)),
+ ]
)
return foo_schema
@dataclass
class FooPOJO:
- f1: "pa.int32"
+ f1: pyfory.int32
f2: str
f3: List[str]
- f4: Dict[str, "pa.int32"]
+ f4: Dict[str, pyfory.int32]
f5: "BarPOJO"
@dataclass
class BarPOJO:
- f1: "pa.int32"
+ f1: pyfory.int32
f2: str
@@ -113,7 +128,7 @@ def create_bar(cls):
@dataclass
class A:
- f1: "pa.int32"
+ f1: pyfory.int32
f2: Dict[str, str]
@@ -125,8 +140,10 @@ def test_map_encoder(data_file_path):
data_bytes = f.read()
obj = encoder.decode(data_bytes)
debug_print("deserialized obj", obj)
- assert a == obj
- assert encoder.decode(encoder.encode(a)) == a
+ # Compare by dict since decoder returns a record class, not the
original class
+ assert to_dict(obj) == to_dict(a)
+ decoded_round_trip = encoder.decode(encoder.encode(a))
+ assert to_dict(decoded_round_trip) == to_dict(a)
f.seek(0)
f.truncate()
f.write(encoder.encode(a))
@@ -141,7 +158,8 @@ def test_encoder_without_schema(data_file_path):
data_bytes = f.read()
obj = encoder.decode(data_bytes)
debug_print("deserialized foo", obj)
- assert foo == obj
+ # Compare by dict since decoder returns a record class, not the
original class
+ assert to_dict(obj) == to_dict(foo)
f.seek(0)
f.truncate()
f.write(encoder.encode(foo))
@@ -177,9 +195,11 @@ def test_serialization_without_schema(data_file_path,
schema=None):
@cross_language_test
def test_serialization_with_schema(schema_file_path, data_file_path):
+ from pyfory.format import Schema
+
with open(schema_file_path, "rb") as f:
schema_bytes = f.read()
- schema = pa.ipc.read_schema(pa.py_buffer(schema_bytes))
+ schema = Schema.from_bytes(schema_bytes)
debug_print("deserialized schema", schema)
test_serialization_without_schema(data_file_path, schema)
@@ -369,61 +389,6 @@ def test_cross_language_reference(data_file_path):
f.write(new_buf.get_bytes(0, new_buf.writer_index))
-@cross_language_test
-def test_serialize_arrow_in_band(data_file_path):
- with open(data_file_path, "rb") as f:
- batch = create_record_batch(2000)
- table = pa.Table.from_batches([batch] * 2)
- data_bytes = f.read()
- buffer = pyfory.Buffer(data_bytes)
- fory = pyfory.Fory(xlang=True, ref=True)
- new_batch = fory.deserialize(buffer)
- assert new_batch == batch
- new_table = fory.deserialize(buffer)
- assert table == new_table
-
-
-@cross_language_test
-def test_serialize_arrow_out_of_band(int_band_file, out_of_band_file):
- with open(int_band_file, "rb") as f:
- in_band_data_bytes = f.read()
- with open(out_of_band_file, "rb") as f:
- out_of_band_data_bytes = f.read()
- batch = create_record_batch(2000)
- table = pa.Table.from_batches([batch] * 2)
- in_band_buffer = pyfory.Buffer(in_band_data_bytes)
- out_of_band_buffer = pyfory.Buffer(out_of_band_data_bytes)
- len1, len2 = out_of_band_buffer.read_int32(),
out_of_band_buffer.read_int32()
- buffers = [
- out_of_band_buffer.slice(8, len1),
- out_of_band_buffer.slice(8 + len1, len2),
- ]
- fory = pyfory.Fory(xlang=True, ref=True)
- objects = fory.deserialize(in_band_buffer, buffers=buffers)
- assert objects == [batch, table]
- buffer_objects = []
- in_band_buffer = fory.serialize([batch, table],
buffer_callback=buffer_objects.append)
- buffers = [o.getbuffer() for o in buffer_objects]
- with open(int_band_file, "wb+") as f:
- f.write(in_band_buffer)
- with open(out_of_band_file, "wb+") as f:
- size_buf = pyfory.Buffer.allocate(8)
- size_buf.write_int32(len(buffers[0]))
- size_buf.write_int32(len(buffers[1]))
- f.write(size_buf)
- f.write(buffers[0])
- f.write(buffers[1])
-
-
-@cross_language_test
-def create_record_batch(size):
- data = [
- pa.array([bool(i % 2) for i in range(size)]),
- pa.array([f"test{i}" for i in range(size)]),
- ]
- return pa.RecordBatch.from_arrays(data, ["boolean", "varchar"])
-
-
@dataclass
class ComplexObject1:
f1: Any = None
diff --git a/python/pyfory/tests/test_serializer.py
b/python/pyfory/tests/test_serializer.py
index 5be23916c..b74ffe783 100644
--- a/python/pyfory/tests/test_serializer.py
+++ b/python/pyfory/tests/test_serializer.py
@@ -41,7 +41,6 @@ from pyfory.serializer import (
PyArraySerializer,
Numpy1DArraySerializer,
)
-from pyfory.tests.core import require_pyarrow
from pyfory.type import TypeId
from pyfory.util import lazy_import
@@ -339,45 +338,6 @@ def test_pickle():
print(f"reader_index {buf.reader_index}")
-@require_pyarrow
-def test_serialize_arrow():
- record_batch = create_record_batch(10000)
- table = pa.Table.from_batches([record_batch, record_batch])
- fory = Fory(xlang=True, ref=True)
- serialized_data = Buffer.allocate(32)
- fory.serialize(record_batch, buffer=serialized_data)
- fory.serialize(table, buffer=serialized_data)
- new_batch = fory.deserialize(serialized_data)
- new_table = fory.deserialize(serialized_data)
- assert new_batch == record_batch
- assert new_table == table
-
-
-@require_pyarrow
-def test_serialize_arrow_zero_copy():
- record_batch = create_record_batch(10000)
- table = pa.Table.from_batches([record_batch, record_batch])
- buffer_objects = []
- fory = Fory(xlang=True, ref=True)
- serialized_data = Buffer.allocate(32)
- fory.serialize(record_batch, buffer=serialized_data,
buffer_callback=buffer_objects.append)
- fory.serialize(table, buffer=serialized_data,
buffer_callback=buffer_objects.append)
- buffers = [o.getbuffer() for o in buffer_objects]
- new_batch = fory.deserialize(serialized_data, buffers=buffers[:1])
- new_table = fory.deserialize(serialized_data, buffers=buffers[1:])
- buffer_objects.clear()
- assert new_batch == record_batch
- assert new_table == table
-
-
-def create_record_batch(size):
- data = [
- pa.array([bool(i % 2) for i in range(size)]),
- pa.array([f"test{i}" for i in range(size)]),
- ]
- return pa.RecordBatch.from_arrays(data, ["boolean", "varchar"])
-
-
@dataclass
class Foo:
f1: int
diff --git a/python/pyfory/type.py b/python/pyfory/type.py
index 349a25cf2..3b842ae89 100644
--- a/python/pyfory/type.py
+++ b/python/pyfory/type.py
@@ -100,6 +100,17 @@ def record_class_factory(cls_name, field_names):
def __reduce__(self):
return self.__class__, tuple(self)
+ def as_dict(self):
+ """Convert record to a dictionary."""
+ result = {}
+ for name in self.__slots__:
+ value = getattr(self, name, None)
+ # Recursively convert nested records
+ if hasattr(value, "as_dict"):
+ value = value.as_dict()
+ result[name] = value
+ return result
+
cls_attrs = dict(
__slots__=field_names,
__init__=__init__,
@@ -109,6 +120,7 @@ def record_class_factory(cls_name, field_names):
__str__=__str__,
__repr__=__repr__,
__reduce__=__reduce__,
+ as_dict=as_dict,
)
cls_ = type(cls_name, (object,), cls_attrs)
@@ -212,10 +224,6 @@ class TypeId:
FLOAT32_ARRAY = 36
# one dimensional float64 array.
FLOAT64_ARRAY = 37
- # an arrow [record
batch](https://arrow.apache.org/docs/cpp/tables.html#record-batches) object.
- ARROW_RECORD_BATCH = 38
- # an arrow [table](https://arrow.apache.org/docs/cpp/tables.html#tables)
object.
- ARROW_TABLE = 39
UNKNOWN = 63
# BOUND id remains at 64
diff --git a/rust/fory-core/src/types.rs b/rust/fory-core/src/types.rs
index f3733bf0c..669bb333d 100644
--- a/rust/fory-core/src/types.rs
+++ b/rust/fory-core/src/types.rs
@@ -79,8 +79,6 @@ pub enum TypeId {
FLOAT16_ARRAY = 35,
FLOAT32_ARRAY = 36,
FLOAT64_ARRAY = 37,
- ARROW_RECORD_BATCH = 38,
- ARROW_TABLE = 39,
U8 = 64,
U16 = 65,
U32 = 66,
@@ -135,8 +133,6 @@ pub const INT64_ARRAY: u32 = TypeId::INT64_ARRAY as u32;
pub const FLOAT16_ARRAY: u32 = TypeId::FLOAT16_ARRAY as u32;
pub const FLOAT32_ARRAY: u32 = TypeId::FLOAT32_ARRAY as u32;
pub const FLOAT64_ARRAY: u32 = TypeId::FLOAT64_ARRAY as u32;
-pub const ARROW_RECORD_BATCH: u32 = TypeId::ARROW_RECORD_BATCH as u32;
-pub const ARROW_TABLE: u32 = TypeId::ARROW_TABLE as u32;
pub const U8: u32 = TypeId::U8 as u32;
pub const U16: u32 = TypeId::U16 as u32;
pub const U32: u32 = TypeId::U32 as u32;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]