This is an automated email from the ASF dual-hosted git repository.
pandalee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fury.git
The following commit(s) were added to refs/heads/main by this push:
new 2faede23 feat(java): support streaming encode/decode to/from buffer
for row format (#2024)
2faede23 is described below
commit 2faede237f3362e5c9221c5a272e0cd927bd8f46
Author: Shawn Yang <[email protected]>
AuthorDate: Sun Jan 26 00:52:22 2025 +0800
feat(java): support streaming encode/decode to/from buffer for row format
(#2024)
## What does this PR do?
support streaming encode/decode to/from buffer for row format
## Related issues
Closes #2019
## Does this PR introduce any user-facing change?
<!--
If any user-facing interface changes, please [open an
issue](https://github.com/apache/fury/issues/new/choose) describing the
need to do so and update the document if necessary.
-->
- [ ] Does this PR introduce any public API change?
- [ ] Does this PR introduce any binary protocol compatibility change?
## Benchmark
<!--
When the PR has an impact on performance (if you don't know whether the
PR will have an impact on performance, you can submit the PR first, and
if it will have impact on performance, the code reviewer will explain
it), be sure to attach a benchmark data here.
-->
---
.../fury/format/encoder/ArrayEncoderBuilder.java | 3 +-
.../format/encoder/BaseBinaryEncoderBuilder.java | 71 ++++------
.../org/apache/fury/format/encoder/Encoder.java | 6 +
.../org/apache/fury/format/encoder/Encoders.java | 148 ++++++++++++++++++---
.../fury/format/encoder/MapEncoderBuilder.java | 55 ++++----
.../apache/fury/format/row/binary/BinaryMap.java | 21 ++-
.../row/binary/writer/BinaryArrayWriter.java | 6 +-
.../format/row/binary/writer/BinaryRowWriter.java | 2 +-
.../org/apache/fury/format/type/TypeInference.java | 29 ++--
.../fury/format/encoder/ArrayEncoderTest.java | 14 +-
.../fury/format/encoder/CodecBuilderTest.java | 20 +++
.../apache/fury/format/encoder/MapEncoderTest.java | 81 ++++++++++-
.../apache/fury/format/encoder/RowEncoderTest.java | 8 +-
.../java/org/apache/fury/test/bean/SimpleFoo.java} | 44 ++++--
14 files changed, 358 insertions(+), 150 deletions(-)
diff --git
a/java/fury-format/src/main/java/org/apache/fury/format/encoder/ArrayEncoderBuilder.java
b/java/fury-format/src/main/java/org/apache/fury/format/encoder/ArrayEncoderBuilder.java
index 990beecc..e6cf137c 100644
---
a/java/fury-format/src/main/java/org/apache/fury/format/encoder/ArrayEncoderBuilder.java
+++
b/java/fury-format/src/main/java/org/apache/fury/format/encoder/ArrayEncoderBuilder.java
@@ -127,7 +127,8 @@ public class ArrayEncoderBuilder extends
BaseBinaryEncoderBuilder {
expressions.add(array);
Expression.Reference fieldExpr = new Expression.Reference(FIELD_NAME,
ARROW_FIELD_TYPE, false);
- Expression listExpression = serializeForArray(array, arrayWriter,
arrayToken, fieldExpr);
+ Expression listExpression =
+ serializeForArrayByWriter(array, arrayWriter, arrayToken, fieldExpr);
expressions.add(listExpression);
diff --git
a/java/fury-format/src/main/java/org/apache/fury/format/encoder/BaseBinaryEncoderBuilder.java
b/java/fury-format/src/main/java/org/apache/fury/format/encoder/BaseBinaryEncoderBuilder.java
index a9b6fc48..4f7f772d 100644
---
a/java/fury-format/src/main/java/org/apache/fury/format/encoder/BaseBinaryEncoderBuilder.java
+++
b/java/fury-format/src/main/java/org/apache/fury/format/encoder/BaseBinaryEncoderBuilder.java
@@ -19,6 +19,7 @@
package org.apache.fury.format.encoder;
+import static org.apache.fury.type.TypeUtils.PRIMITIVE_INT_TYPE;
import static org.apache.fury.type.TypeUtils.getRawType;
import java.math.BigDecimal;
@@ -70,7 +71,6 @@ import org.apache.fury.util.Preconditions;
import org.apache.fury.util.StringUtils;
/** Base encoder builder for {@link Row}, {@link ArrayData} and {@link
MapData}. */
-@SuppressWarnings("UnstableApiUsage")
public abstract class BaseBinaryEncoderBuilder extends CodecBuilder {
protected static final String REFERENCES_NAME = "references";
protected static final TypeRef<Schema> SCHEMA_TYPE =
TypeRef.of(Schema.class);
@@ -222,22 +222,14 @@ public abstract class BaseBinaryEncoderBuilder extends
CodecBuilder {
}
}
- /**
- * Returns an expression to write iterable <code>inputObject</code> of type
<code>typeToken</code>
- * as {@link BinaryArray} using given <code>writer</code>.
- */
protected Expression serializeForArray(
Expression inputObject, Expression writer, TypeRef<?> typeRef,
Expression arrowField) {
- return serializeForArray(inputObject, writer, typeRef, arrowField, false);
+ Reference arrayWriter = getOrCreateArrayWriter(typeRef, arrowField,
writer);
+ return serializeForArrayByWriter(inputObject, arrayWriter, typeRef,
arrowField);
}
- protected Expression serializeForArray(
- Expression inputObject,
- Expression writer,
- TypeRef<?> typeRef,
- Expression arrowField,
- boolean reuse) {
- Reference arrayWriter = getOrCreateArrayWriter(typeRef, arrowField,
writer, reuse);
+ protected Expression serializeForArrayByWriter(
+ Expression inputObject, Expression arrayWriter, TypeRef<?> typeRef,
Expression arrowField) {
StaticInvoke arrayElementField =
new StaticInvoke(
DataTypes.class, "arrayElementField", "elemField",
ARROW_FIELD_TYPE, false, arrowField);
@@ -285,21 +277,8 @@ public abstract class BaseBinaryEncoderBuilder extends
CodecBuilder {
}
}
- /**
- * Get or create an ArrayWriter for given <code>type</code> and use
<code>writer</code> as parent
- * writer.
- */
protected Reference getOrCreateArrayWriter(
TypeRef<?> typeRef, Expression arrayDataType, Expression writer) {
- return getOrCreateArrayWriter(typeRef, arrayDataType, writer, false);
- }
-
- protected Reference getOrCreateArrayWriter(
- TypeRef<?> typeRef, Expression arrayDataType, Expression writer, boolean
reuse) {
- if (reuse) {
- return (Reference) writer;
- }
-
return arrayWriterMap.computeIfAbsent(
typeRef,
t -> {
@@ -344,39 +323,37 @@ public abstract class BaseBinaryEncoderBuilder extends
CodecBuilder {
TypeRef<?> keySetType =
supertype.resolveType(TypeUtils.KEY_SET_RETURN_TYPE);
TypeRef<?> valuesType =
supertype.resolveType(TypeUtils.VALUES_RETURN_TYPE);
+ ListExpression expressions = new ListExpression();
+
+ Invoke offset = new Invoke(writer, "writerIndex", "writerIndex",
TypeUtils.PRIMITIVE_INT_TYPE);
+ // preserve 8 bytes to write the key array numBytes later
+ Invoke preserve = new Invoke(writer, "writeDirectly", Literal.ofInt(-1));
+ expressions.add(offset, preserve);
+
Invoke keySet = new Invoke(inputObject, "keySet", keySetType);
Expression keySerializationExpr = serializeForArray(keySet, writer,
keySetType, keyArrayField);
+ expressions.add(keySet, keySerializationExpr);
+
+ expressions.add(
+ new Expression.Invoke(
+ writer,
+ "writeDirectly",
+ offset,
+ Expression.Invoke.inlineInvoke(keySerializationExpr, "size",
PRIMITIVE_INT_TYPE)));
Invoke values = new Invoke(inputObject, "values", valuesType);
Expression valueSerializationExpr =
serializeForArray(values, writer, valuesType, valueArrayField);
+ expressions.add(values, valueSerializationExpr);
- Invoke offset = new Invoke(writer, "writerIndex", "writerIndex",
TypeUtils.PRIMITIVE_INT_TYPE);
- // preserve 8 bytes to write the key array numBytes later
- Invoke preserve =
- new Invoke(writer, "writeDirectly", new Literal(-1,
TypeUtils.PRIMITIVE_INT_TYPE));
- Invoke writeKeyArrayNumBytes =
- new Invoke(
- writer,
- "writeDirectly",
- offset,
- new Invoke(keySerializationExpr, "size",
TypeUtils.PRIMITIVE_INT_TYPE));
Arithmetic size =
ExpressionUtils.subtract(
- new Invoke(writer, "writerIndex", "writerIndex",
TypeUtils.PRIMITIVE_INT_TYPE), offset);
+ new Invoke(writer, "writerIndex", "writerIndex",
PRIMITIVE_INT_TYPE), offset);
Invoke setOffsetAndSize = new Invoke(writer, "setOffsetAndSize", ordinal,
offset, size);
-
- ListExpression expression =
- new ListExpression(
- offset,
- preserve,
- keySerializationExpr,
- writeKeyArrayNumBytes,
- valueSerializationExpr,
- setOffsetAndSize);
+ expressions.add(setOffsetAndSize);
return new If(
- ExpressionUtils.eqNull(inputObject), new Invoke(writer, "setNullAt",
ordinal), expression);
+ ExpressionUtils.eqNull(inputObject), new Invoke(writer, "setNullAt",
ordinal), expressions);
}
/**
diff --git
a/java/fury-format/src/main/java/org/apache/fury/format/encoder/Encoder.java
b/java/fury-format/src/main/java/org/apache/fury/format/encoder/Encoder.java
index 3e0d96c0..76863905 100644
--- a/java/fury-format/src/main/java/org/apache/fury/format/encoder/Encoder.java
+++ b/java/fury-format/src/main/java/org/apache/fury/format/encoder/Encoder.java
@@ -19,6 +19,8 @@
package org.apache.fury.format.encoder;
+import org.apache.fury.memory.MemoryBuffer;
+
/**
* The encoding interface for encode/decode object to/from binary. The
implementation class must
* have a constructor with signature {@code Object[] references}, so we can
pass any params to
@@ -28,7 +30,11 @@ package org.apache.fury.format.encoder;
*/
public interface Encoder<T> {
+ T decode(MemoryBuffer buffer);
+
T decode(byte[] bytes);
byte[] encode(T obj);
+
+ void encode(MemoryBuffer buffer, T obj);
}
diff --git
a/java/fury-format/src/main/java/org/apache/fury/format/encoder/Encoders.java
b/java/fury-format/src/main/java/org/apache/fury/format/encoder/Encoders.java
index ea2329df..ac375f67 100644
---
a/java/fury-format/src/main/java/org/apache/fury/format/encoder/Encoders.java
+++
b/java/fury-format/src/main/java/org/apache/fury/format/encoder/Encoders.java
@@ -19,10 +19,12 @@
package org.apache.fury.format.encoder;
+import static org.apache.fury.type.TypeUtils.OBJECT_TYPE;
import static org.apache.fury.type.TypeUtils.getRawType;
import java.util.Collection;
import java.util.HashSet;
+import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import org.apache.arrow.util.Preconditions;
@@ -90,6 +92,11 @@ public class Encoders {
return encoder.toRow(obj);
}
+ @Override
+ public T decode(MemoryBuffer buffer) {
+ return encoder.decode(buffer);
+ }
+
@Override
public T decode(byte[] bytes) {
return encoder.decode(bytes);
@@ -99,6 +106,11 @@ public class Encoders {
public byte[] encode(T obj) {
return encoder.encode(obj);
}
+
+ @Override
+ public void encode(MemoryBuffer buffer, T obj) {
+ encoder.encode(buffer, obj);
+ }
};
}
@@ -150,8 +162,11 @@ public class Encoders {
}
@Override
- public T decode(byte[] bytes) {
- MemoryBuffer buffer = MemoryUtils.wrap(bytes);
+ public T decode(MemoryBuffer buffer) {
+ return decode(buffer, buffer.readInt32());
+ }
+
+ public T decode(MemoryBuffer buffer, int size) {
long peerSchemaHash = buffer.readInt64();
if (peerSchemaHash != schemaHash) {
throw new ClassNotCompatibleException(
@@ -162,10 +177,16 @@ public class Encoders {
schema, schemaHash, peerSchemaHash));
}
BinaryRow row = new BinaryRow(schema);
- row.pointTo(buffer, buffer.readerIndex(), buffer.size());
+ row.pointTo(buffer, buffer.readerIndex(), size);
+ buffer.increaseReaderIndex(size - 8);
return fromRow(row);
}
+ @Override
+ public T decode(byte[] bytes) {
+ return decode(MemoryUtils.wrap(bytes), bytes.length);
+ }
+
@Override
public byte[] encode(T obj) {
buffer.writerIndex(0);
@@ -175,6 +196,21 @@ public class Encoders {
BinaryRow row = toRow(obj);
return buffer.getBytes(0, 8 + row.getSizeInBytes());
}
+
+ @Override
+ public void encode(MemoryBuffer buffer, T obj) {
+ int writerIndex = buffer.writerIndex();
+ buffer.writeInt32(-1);
+ try {
+ buffer.writeInt64(schemaHash);
+ writer.setBuffer(buffer);
+ writer.reset();
+ toRow(obj);
+ buffer.putInt32(writerIndex, buffer.writerIndex() - writerIndex -
4);
+ } finally {
+ writer.setBuffer(this.buffer);
+ }
+ }
};
} catch (Exception e) {
String msg = String.format("Create encoder failed, \nbeanClass: %s",
beanClass);
@@ -192,7 +228,7 @@ public class Encoders {
* @return
*/
public static <T extends Collection> ArrayEncoder<T> arrayEncoder(TypeRef<T>
token) {
- return arrayEncoder(token, (Fury) null);
+ return arrayEncoder(token, null);
}
public static <T extends Collection> ArrayEncoder<T> arrayEncoder(TypeRef<T>
token, Fury fury) {
@@ -229,6 +265,11 @@ public class Encoders {
return encoder.toArray(obj);
}
+ @Override
+ public T decode(MemoryBuffer buffer) {
+ return encoder.decode(buffer);
+ }
+
@Override
public T decode(byte[] bytes) {
return encoder.decode(bytes);
@@ -238,6 +279,11 @@ public class Encoders {
public byte[] encode(T obj) {
return encoder.encode(obj);
}
+
+ @Override
+ public void encode(MemoryBuffer buffer, T obj) {
+ encoder.encode(buffer, obj);
+ }
};
}
@@ -299,19 +345,44 @@ public class Encoders {
}
@Override
- public T decode(byte[] bytes) {
- MemoryBuffer buffer = MemoryUtils.wrap(bytes);
+ public T decode(MemoryBuffer buffer) {
+ return decode(buffer, buffer.readInt32());
+ }
+
+ public T decode(MemoryBuffer buffer, int size) {
BinaryArray array = new BinaryArray(field);
- array.pointTo(buffer, buffer.readerIndex(), buffer.size());
+ int readerIndex = buffer.readerIndex();
+ array.pointTo(buffer, readerIndex, size);
+ buffer.readerIndex(readerIndex + size);
return fromArray(array);
}
+ @Override
+ public T decode(byte[] bytes) {
+ return decode(MemoryUtils.wrap(bytes), bytes.length);
+ }
+
@Override
public byte[] encode(T obj) {
- writer.reset(obj.size());
BinaryArray array = toArray(obj);
return writer.getBuffer().getBytes(0, 8 + array.getSizeInBytes());
}
+
+ @Override
+ public void encode(MemoryBuffer buffer, T obj) {
+ MemoryBuffer prevBuffer = writer.getBuffer();
+ int writerIndex = buffer.writerIndex();
+ buffer.writeInt32(-1);
+ try {
+ writer.setBuffer(buffer);
+ BinaryArray array = toArray(obj);
+ int size = buffer.writerIndex() - writerIndex - 4;
+ assert size == array.getSizeInBytes();
+ buffer.putInt32(writerIndex, size);
+ } finally {
+ writer.setBuffer(prevBuffer);
+ }
+ }
};
} catch (Exception e) {
String msg = String.format("Create encoder failed, \nelementType: %s",
elementType);
@@ -329,7 +400,7 @@ public class Encoders {
* @return
*/
public static <T extends Map> MapEncoder<T> mapEncoder(TypeRef<T> token) {
- return mapEncoder(token, (Fury) null);
+ return mapEncoder(token, null);
}
/**
@@ -348,17 +419,12 @@ public class Encoders {
public static <T extends Map> MapEncoder<T> mapEncoder(TypeRef<T> token,
Fury fury) {
Preconditions.checkNotNull(token);
-
Tuple2<TypeRef<?>, TypeRef<?>> tuple2 =
TypeUtils.getMapKeyValueType(token);
Set<TypeRef<?>> set1 = beanSet(tuple2.f0);
Set<TypeRef<?>> set2 = beanSet(tuple2.f1);
LOG.info("Find beans to load: {}, {}", set1, set2);
- if (set1.isEmpty() && set2.isEmpty()) {
- throw new IllegalArgumentException("can not find bean class.");
- }
-
TypeRef<?> keyToken = token4BeanLoad(set1, tuple2.f0);
TypeRef<?> valToken = token4BeanLoad(set2, tuple2.f1);
@@ -388,7 +454,7 @@ public class Encoders {
Field keyField = DataTypes.keyArrayFieldForMap(field);
Field valField = DataTypes.itemArrayFieldForMap(field);
BinaryArrayWriter keyWriter = new BinaryArrayWriter(keyField);
- BinaryArrayWriter valWriter = new BinaryArrayWriter(valField);
+ BinaryArrayWriter valWriter = new BinaryArrayWriter(valField,
keyWriter.getBuffer());
try {
Class<?> rowCodecClass = loadOrGenMapCodecClass(mapToken, keyToken,
valToken);
Object references = new Object[] {keyField, valField, keyWriter,
valWriter, fury, field};
@@ -421,17 +487,43 @@ public class Encoders {
}
@Override
- public T decode(byte[] bytes) {
- MemoryBuffer buffer = MemoryUtils.wrap(bytes);
+ public T decode(MemoryBuffer buffer) {
+ return decode(buffer, buffer.readInt32());
+ }
+
+ public T decode(MemoryBuffer buffer, int size) {
BinaryMap map = new BinaryMap(field);
- map.pointTo(buffer, 0, buffer.size());
+ int readerIndex = buffer.readerIndex();
+ map.pointTo(buffer, readerIndex, size);
+ buffer.readerIndex(readerIndex + size);
return fromMap(map);
}
+ @Override
+ public T decode(byte[] bytes) {
+ return decode(MemoryUtils.wrap(bytes), bytes.length);
+ }
+
@Override
public byte[] encode(T obj) {
BinaryMap map = toMap(obj);
- return map.getBuf().readBytes(map.getBuf().size());
+ return map.getBuf().getBytes(map.getBaseOffset(),
map.getSizeInBytes());
+ }
+
+ @Override
+ public void encode(MemoryBuffer buffer, T obj) {
+ MemoryBuffer prevBuffer = keyWriter.getBuffer();
+ int writerIndex = buffer.writerIndex();
+ buffer.writeInt32(-1);
+ try {
+ keyWriter.setBuffer(buffer);
+ valWriter.setBuffer(buffer);
+ toMap(obj);
+ buffer.putInt32(writerIndex, buffer.writerIndex() - writerIndex -
4);
+ } finally {
+ keyWriter.setBuffer(prevBuffer);
+ valWriter.setBuffer(prevBuffer);
+ }
}
};
} catch (Exception e) {
@@ -484,6 +576,11 @@ public class Encoders {
return encoder.toMap(obj);
}
+ @Override
+ public T decode(MemoryBuffer buffer) {
+ return encoder.decode(buffer);
+ }
+
@Override
public T decode(byte[] bytes) {
return encoder.decode(bytes);
@@ -493,12 +590,22 @@ public class Encoders {
public byte[] encode(T obj) {
return encoder.encode(obj);
}
+
+ @Override
+ public void encode(MemoryBuffer buffer, T obj) {
+ encoder.encode(buffer, obj);
+ }
};
}
private static void findBeanToken(TypeRef<?> typeRef,
java.util.Set<TypeRef<?>> set) {
+ Set<TypeRef<?>> visited = new LinkedHashSet<>();
while (TypeUtils.ITERABLE_TYPE.isSupertypeOf(typeRef)
|| TypeUtils.MAP_TYPE.isSupertypeOf(typeRef)) {
+ if (visited.contains(typeRef)) {
+ return;
+ }
+ visited.add(typeRef);
if (TypeUtils.ITERABLE_TYPE.isSupertypeOf(typeRef)) {
typeRef = TypeUtils.getElementType(typeRef);
if (TypeUtils.isBean(typeRef)) {
@@ -572,7 +679,8 @@ public class Encoders {
cls = getRawType(valueToken);
beanToken = valueToken;
} else {
- throw new IllegalArgumentException("not find bean class.");
+ cls = Object.class;
+ beanToken = OBJECT_TYPE;
}
// class name prefix
String prefix = TypeInference.inferTypeName(mapCls);
diff --git
a/java/fury-format/src/main/java/org/apache/fury/format/encoder/MapEncoderBuilder.java
b/java/fury-format/src/main/java/org/apache/fury/format/encoder/MapEncoderBuilder.java
index cbd4fb52..4690cebc 100644
---
a/java/fury-format/src/main/java/org/apache/fury/format/encoder/MapEncoderBuilder.java
+++
b/java/fury-format/src/main/java/org/apache/fury/format/encoder/MapEncoderBuilder.java
@@ -20,6 +20,7 @@
package org.apache.fury.format.encoder;
import static org.apache.fury.type.TypeUtils.CLASS_TYPE;
+import static org.apache.fury.type.TypeUtils.PRIMITIVE_INT_TYPE;
import static org.apache.fury.type.TypeUtils.getRawType;
import java.util.Map;
@@ -40,7 +41,6 @@ import org.apache.fury.type.TypeUtils;
import org.apache.fury.util.StringUtils;
/** Expression builder for building jit map encoder class. */
-@SuppressWarnings("UnstableApiUsage")
public class MapEncoderBuilder extends BaseBinaryEncoderBuilder {
private static final Logger LOG =
LoggerFactory.getLogger(MapEncoderBuilder.class);
private static final String FIELD_NAME = "field";
@@ -168,17 +168,38 @@ public class MapEncoderBuilder extends
BaseBinaryEncoderBuilder {
Expression.Reference valFieldExpr =
new Expression.Reference(VALUE_FIELD_NAME, ARROW_FIELD_TYPE, false);
- Expression listExpression =
- directlySerializeMap(map, keyArrayWriter, valArrayWriter,
keyFieldExpr, valFieldExpr);
+ @SuppressWarnings("unchecked")
+ TypeRef<?> supertype = ((TypeRef<? extends Map<?, ?>>)
mapToken).getSupertype(Map.class);
+ TypeRef<?> keySetType =
supertype.resolveType(TypeUtils.KEY_SET_RETURN_TYPE);
+ TypeRef<?> valuesType =
supertype.resolveType(TypeUtils.VALUES_RETURN_TYPE);
+ Expression.Invoke keySet = new Expression.Invoke(map, "keySet",
keySetType);
+ Expression writerIndex =
+ new Expression.Invoke(keyArrayWriter, "writerIndex",
PRIMITIVE_INT_TYPE);
+ expressions.add(writerIndex);
+ expressions.add(
+ new Expression.Invoke(keyArrayWriter, "writeDirectly",
Expression.Literal.ofInt(-1)));
+ Expression keySerializationExpr =
+ serializeForArrayByWriter(keySet, keyArrayWriter, keySetType,
keyFieldExpr);
Expression.Invoke keyArray =
new Expression.Invoke(keyArrayWriter, "toArray",
TypeRef.of(BinaryArray.class));
+ expressions.add(map);
+ expressions.add(keySerializationExpr);
+ expressions.add(keyArray);
+ expressions.add(
+ new Expression.Invoke(
+ keyArrayWriter,
+ "writeDirectly",
+ writerIndex,
+ Expression.Invoke.inlineInvoke(keyArray, "getSizeInBytes",
PRIMITIVE_INT_TYPE)));
+
+ Expression.Invoke values = new Expression.Invoke(map, "values",
valuesType);
+ Expression valueSerializationExpr =
+ serializeForArrayByWriter(values, valArrayWriter, valuesType,
valFieldExpr);
Expression.Invoke valArray =
new Expression.Invoke(valArrayWriter, "toArray",
TypeRef.of(BinaryArray.class));
- expressions.add(map);
- expressions.add(listExpression);
- expressions.add(keyArray);
+ expressions.add(valueSerializationExpr);
expressions.add(valArray);
expressions.add(
new Expression.Return(
@@ -206,28 +227,6 @@ public class MapEncoderBuilder extends
BaseBinaryEncoderBuilder {
return expressions;
}
- private Expression directlySerializeMap(
- Expression map,
- Expression keyArrayWriter,
- Expression valArrayWriter,
- Expression keyFieldExpr,
- Expression valFieldExpr) {
- @SuppressWarnings("unchecked")
- TypeRef<?> supertype = ((TypeRef<? extends Map<?, ?>>)
mapToken).getSupertype(Map.class);
- TypeRef<?> keySetType =
supertype.resolveType(TypeUtils.KEY_SET_RETURN_TYPE);
- TypeRef<?> valuesType =
supertype.resolveType(TypeUtils.VALUES_RETURN_TYPE);
-
- Expression.Invoke keySet = new Expression.Invoke(map, "keySet",
keySetType);
- Expression keySerializationExpr =
- serializeForArray(keySet, keyArrayWriter, keySetType, keyFieldExpr,
true);
-
- Expression.Invoke values = new Expression.Invoke(map, "values",
valuesType);
- Expression valueSerializationExpr =
- serializeForArray(values, valArrayWriter, valuesType, valFieldExpr,
true);
-
- return new Expression.ListExpression(keySerializationExpr,
valueSerializationExpr);
- }
-
private Expression directlyDeserializeMap(
Expression map, Expression keyArrayRef, Expression valArrayRef) {
@SuppressWarnings("unchecked")
diff --git
a/java/fury-format/src/main/java/org/apache/fury/format/row/binary/BinaryMap.java
b/java/fury-format/src/main/java/org/apache/fury/format/row/binary/BinaryMap.java
index d0e9886c..bb41db29 100644
---
a/java/fury-format/src/main/java/org/apache/fury/format/row/binary/BinaryMap.java
+++
b/java/fury-format/src/main/java/org/apache/fury/format/row/binary/BinaryMap.java
@@ -53,17 +53,10 @@ public class BinaryMap implements MapData {
this.keys = keys;
this.values = values;
this.field = field;
- this.baseOffset = 0;
+ this.buf = keys.getBuffer();
+ this.baseOffset = keys.getBaseOffset() - 8;
+ // memory of keys and values must be continuous.
this.sizeInBytes = keys.getSizeInBytes() + values.getSizeInBytes() + 8;
-
- MemoryBuffer copyBuf = MemoryUtils.buffer(sizeInBytes);
- copyBuf.putInt32(0, keys.getSizeInBytes());
- copyBuf.putInt32(4, 0);
- keys.getBuffer().copyTo(baseOffset, copyBuf, 8, keys.getSizeInBytes());
- values
- .getBuffer()
- .copyTo(baseOffset, copyBuf, keys.getSizeInBytes() + 8,
values.getSizeInBytes());
- this.buf = copyBuf;
}
public void pointTo(MemoryBuffer buf, int offset, int sizeInBytes) {
@@ -73,12 +66,14 @@ public class BinaryMap implements MapData {
// Read the numBytes of key array from the aligned first 8 bytes as int.
final int keyArrayBytes = buf.getInt32(offset);
assert keyArrayBytes >= 0 : "keyArrayBytes (" + keyArrayBytes + ") should
>= 0";
+ keys.pointTo(buf, offset + 8, keyArrayBytes);
final int valueArrayBytes = sizeInBytes - keyArrayBytes - 8;
assert valueArrayBytes >= 0 : "valueArraySize (" + valueArrayBytes + ")
should >= 0";
-
- keys.pointTo(buf, offset + 8, keyArrayBytes);
values.pointTo(buf, offset + 8 + keyArrayBytes, valueArrayBytes);
- assert keys.numElements() == values.numElements();
+ if (keys.numElements() != values.numElements()) {
+ throw new UnsupportedOperationException();
+ }
+ // assert keys.numElements() == values.numElements();
}
public MemoryBuffer getBuf() {
diff --git
a/java/fury-format/src/main/java/org/apache/fury/format/row/binary/writer/BinaryArrayWriter.java
b/java/fury-format/src/main/java/org/apache/fury/format/row/binary/writer/BinaryArrayWriter.java
index 7cd46032..ea07fdc4 100644
---
a/java/fury-format/src/main/java/org/apache/fury/format/row/binary/writer/BinaryArrayWriter.java
+++
b/java/fury-format/src/main/java/org/apache/fury/format/row/binary/writer/BinaryArrayWriter.java
@@ -72,7 +72,7 @@ public class BinaryArrayWriter extends BinaryWriter {
// there's no need to set `super.startIndex = writer.writerIndex();`
}
- private BinaryArrayWriter(Field field, MemoryBuffer buffer) {
+ public BinaryArrayWriter(Field field, MemoryBuffer buffer) {
super(buffer, 8);
this.field = field;
int width = DataTypes.getTypeWidth(field.getChildren().get(0).getType());
@@ -208,9 +208,7 @@ public class BinaryArrayWriter extends BinaryWriter {
public BinaryArray toArray() {
BinaryArray array = new BinaryArray(field);
int size = size();
- MemoryBuffer buffer = MemoryUtils.buffer(size);
- this.buffer.copyTo(startIndex, buffer, 0, size);
- array.pointTo(buffer, 0, size);
+ array.pointTo(buffer, startIndex, size);
return array;
}
diff --git
a/java/fury-format/src/main/java/org/apache/fury/format/row/binary/writer/BinaryRowWriter.java
b/java/fury-format/src/main/java/org/apache/fury/format/row/binary/writer/BinaryRowWriter.java
index 04bcd336..d641649f 100644
---
a/java/fury-format/src/main/java/org/apache/fury/format/row/binary/writer/BinaryRowWriter.java
+++
b/java/fury-format/src/main/java/org/apache/fury/format/row/binary/writer/BinaryRowWriter.java
@@ -140,7 +140,7 @@ public class BinaryRowWriter extends BinaryWriter {
int size = size();
MemoryBuffer buffer = MemoryUtils.buffer(size);
this.buffer.copyTo(startIndex, buffer, 0, size);
- row.pointTo(buffer, 0, size);
+ row.pointTo(buffer, startIndex, size);
return row;
}
}
diff --git
a/java/fury-format/src/main/java/org/apache/fury/format/type/TypeInference.java
b/java/fury-format/src/main/java/org/apache/fury/format/type/TypeInference.java
index c4e2e104..52491f21 100644
---
a/java/fury-format/src/main/java/org/apache/fury/format/type/TypeInference.java
+++
b/java/fury-format/src/main/java/org/apache/fury/format/type/TypeInference.java
@@ -45,7 +45,6 @@ import org.apache.fury.util.Preconditions;
import org.apache.fury.util.StringUtils;
/** Arrow related type inference. */
-@SuppressWarnings("UnstableApiUsage")
public class TypeInference {
public static Schema inferSchema(java.lang.reflect.Type type) {
@@ -240,24 +239,16 @@ public class TypeInference {
public static String inferTypeName(TypeRef<?> token) {
StringBuilder sb = new StringBuilder();
- TypeRef<?> arrayToken = token;
- while (TypeUtils.ITERABLE_TYPE.isSupertypeOf(arrayToken)
- || TypeUtils.MAP_TYPE.isSupertypeOf(arrayToken)) {
- if (TypeUtils.ITERABLE_TYPE.isSupertypeOf(arrayToken)) {
- sb.append(getRawType(arrayToken).getSimpleName());
- arrayToken = TypeUtils.getElementType(arrayToken);
- } else {
- Tuple2<TypeRef<?>, TypeRef<?>> tuple2 =
TypeUtils.getMapKeyValueType(arrayToken);
- sb.append("Map");
-
- if (!TypeUtils.isBean(tuple2.f0)) {
- arrayToken = tuple2.f0;
- }
-
- if (!TypeUtils.isBean(tuple2.f1)) {
- arrayToken = tuple2.f1;
- }
- }
+ if (TypeUtils.ITERABLE_TYPE.isSupertypeOf(token)) {
+ sb.append("Array_");
+ sb.append(inferTypeName(TypeUtils.getElementType(token)));
+ } else if (TypeUtils.MAP_TYPE.isSupertypeOf(token)) {
+ sb.append("Map_");
+ Tuple2<TypeRef<?>, TypeRef<?>> mapKeyValueType =
TypeUtils.getMapKeyValueType(token);
+ sb.append(inferTypeName(mapKeyValueType.f0));
+ sb.append("_").append(inferTypeName(mapKeyValueType.f1));
+ } else {
+ sb.append(token.getRawType().getSimpleName());
}
return sb.toString();
}
diff --git
a/java/fury-format/src/test/java/org/apache/fury/format/encoder/ArrayEncoderTest.java
b/java/fury-format/src/test/java/org/apache/fury/format/encoder/ArrayEncoderTest.java
index 0af1891a..b10306df 100644
---
a/java/fury-format/src/test/java/org/apache/fury/format/encoder/ArrayEncoderTest.java
+++
b/java/fury-format/src/test/java/org/apache/fury/format/encoder/ArrayEncoderTest.java
@@ -19,6 +19,8 @@
package org.apache.fury.format.encoder;
+import static
org.apache.fury.format.encoder.CodecBuilderTest.testStreamingEncode;
+
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -51,8 +53,10 @@ public class ArrayEncoderTest {
byte[] bs = encoder.encode(bars);
List<RowEncoderTest.Bar> bbars = encoder.decode(bs);
- Assert.assertEquals(bs.length, 280);
+ Assert.assertEquals(bs.length, 224);
Assert.assertEquals(bars, bbars);
+
+ testStreamingEncode(encoder, bars);
}
@Test
@@ -83,8 +87,10 @@ public class ArrayEncoderTest {
byte[] bs = encoder.encode(bars);
List<List<List<RowEncoderTest.Bar>>> bbars = encoder.decode(bs);
- Assert.assertEquals(bs.length, 1632);
+ Assert.assertEquals(bs.length, 1576);
Assert.assertEquals(bars, bbars);
+
+ testStreamingEncode(encoder, bars);
}
@Test
@@ -115,7 +121,9 @@ public class ArrayEncoderTest {
byte[] bs = encoder.encode(lmap);
List<List<Map<RowEncoderTest.Foo, List<RowEncoderTest.Bar>>>> blmap =
encoder.decode(bs);
- Assert.assertEquals(bs.length, 10920);
+ Assert.assertEquals(bs.length, 10824);
Assert.assertEquals(lmap, blmap);
+
+ testStreamingEncode(encoder, lmap);
}
}
diff --git
a/java/fury-format/src/test/java/org/apache/fury/format/encoder/CodecBuilderTest.java
b/java/fury-format/src/test/java/org/apache/fury/format/encoder/CodecBuilderTest.java
index 648009f7..4e6530ce 100644
---
a/java/fury-format/src/test/java/org/apache/fury/format/encoder/CodecBuilderTest.java
+++
b/java/fury-format/src/test/java/org/apache/fury/format/encoder/CodecBuilderTest.java
@@ -20,8 +20,10 @@
package org.apache.fury.format.encoder;
import static org.testng.Assert.assertTrue;
+import static org.testng.AssertJUnit.assertEquals;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.fury.memory.MemoryBuffer;
import org.apache.fury.test.bean.BeanA;
import org.apache.fury.test.bean.BeanB;
import org.apache.fury.test.bean.Foo;
@@ -45,4 +47,22 @@ public class CodecBuilderTest {
GeneratedRowEncoder.class.isAssignableFrom(
Encoders.loadOrGenRowCodecClass(AtomicLong.class)));
}
+
+ static void testStreamingEncode(Encoder encoder, Object object) {
+ MemoryBuffer buffer = MemoryBuffer.newHeapBuffer(32);
+ for (int i = 0; i < 1; i++) {
+ buffer.writerIndex(0);
+ buffer.readerIndex(0);
+ for (int j = 0; j <= i; j++) {
+ buffer.writerIndex(0);
+ buffer.readerIndex(0);
+ buffer.writeByte(-1);
+ buffer.readByte();
+ encoder.encode(buffer, object);
+ encoder.encode(buffer, object);
+ assertEquals(object, encoder.decode(buffer));
+ assertEquals(object, encoder.decode(buffer));
+ }
+ }
+ }
}
diff --git
a/java/fury-format/src/test/java/org/apache/fury/format/encoder/MapEncoderTest.java
b/java/fury-format/src/test/java/org/apache/fury/format/encoder/MapEncoderTest.java
index 53ff55c4..4b305d39 100644
---
a/java/fury-format/src/test/java/org/apache/fury/format/encoder/MapEncoderTest.java
+++
b/java/fury-format/src/test/java/org/apache/fury/format/encoder/MapEncoderTest.java
@@ -19,6 +19,10 @@
package org.apache.fury.format.encoder;
+import static
org.apache.fury.format.encoder.CodecBuilderTest.testStreamingEncode;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -26,6 +30,8 @@ import java.util.List;
import java.util.Map;
import org.apache.fury.format.row.binary.BinaryMap;
import org.apache.fury.reflect.TypeRef;
+import org.apache.fury.test.bean.Foo;
+import org.apache.fury.test.bean.SimpleFoo;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -45,7 +51,6 @@ public class MapEncoderTest {
Encoders.mapEncoder(bars.getClass(), String.class,
RowEncoderTest.Bar.class);
BinaryMap array = encoder.toMap(bars);
Map<String, RowEncoderTest.Bar> newBars = encoder.fromMap(array);
-
Assert.assertEquals(bars, newBars);
byte[] bytes = encoder.encode(bars);
@@ -83,6 +88,78 @@ public class MapEncoderTest {
Assert.assertEquals(decodeMap.size(), 5);
}
+ @Test
+ public void testSimpleNestArrayWithMapEncoder1() {
+ Map<String, List<Integer>> map = new HashMap<>();
+ map.put("k1", ImmutableList.of(1, 2));
+
+ MapEncoder<Map<String, List<Integer>>> encoder =
+ Encoders.mapEncoder(new TypeRef<Map<String, List<Integer>>>() {});
+
+ testStreamingEncode(encoder, map);
+ }
+
+ @Test
+ public void testSimpleNestArrayWithMapEncoder2() {
+ Map<String, List<List<Integer>>> map = new HashMap<>();
+ map.put("k1", ImmutableList.of(ImmutableList.of(1, 2), ImmutableList.of(1,
2)));
+
+ MapEncoder<Map<String, List<List<Integer>>>> encoder =
+ Encoders.mapEncoder(new TypeRef<Map<String, List<List<Integer>>>>()
{});
+
+ testStreamingEncode(encoder, map);
+ }
+
+ @Test
+ public void testSimpleStructWithMapEncoder2() {
+ Map<String, Foo> map = new HashMap<>();
+ map.put("k1", Foo.create());
+
+ MapEncoder<Map<String, Foo>> encoder = Encoders.mapEncoder(new
TypeRef<Map<String, Foo>>() {});
+
+ testStreamingEncode(encoder, map);
+ }
+
+ @Test
+ public void testSimpleNestStructWithMapEncoder() {
+ Map<String, List<Foo>> map = new HashMap<>();
+ map.put("k1", ImmutableList.of(Foo.create()));
+
+ MapEncoder<Map<String, List<Foo>>> encoder =
+ Encoders.mapEncoder(new TypeRef<Map<String, List<Foo>>>() {});
+
+ testStreamingEncode(encoder, map);
+ }
+
+ @Test
+ public void testKVStructMap() {
+ Map<SimpleFoo, SimpleFoo> map = ImmutableMap.of(SimpleFoo.create(),
SimpleFoo.create());
+ MapEncoder encoder = Encoders.mapEncoder(new TypeRef<Map<SimpleFoo,
SimpleFoo>>() {});
+ testStreamingEncode(encoder, map);
+ MapEncoder encoder1 = Encoders.mapEncoder(new TypeRef<Map<Foo, Foo>>() {});
+ testStreamingEncode(encoder1, ImmutableMap.of(Foo.create(), Foo.create()));
+ }
+
+ @Test
+ public void testSimpleNestKVStructMapArray() {
+ ArrayEncoder<List<Map<SimpleFoo, SimpleFoo>>> encoder =
+ Encoders.arrayEncoder(new TypeRef<List<Map<SimpleFoo, SimpleFoo>>>()
{});
+
+ testStreamingEncode(
+ encoder, ImmutableList.of(ImmutableMap.of(SimpleFoo.create(),
SimpleFoo.create())));
+ }
+
+ @Test
+ public void testSimpleNestKVStruct() {
+ Map<String, List<Map<Foo, Foo>>> map = new HashMap<>();
+ map.put("k1", ImmutableList.of(ImmutableMap.of(Foo.create(),
Foo.create())));
+
+ MapEncoder<Map<String, List<Map<Foo, Foo>>>> encoder =
+ Encoders.mapEncoder(new TypeRef<Map<String, List<Map<Foo, Foo>>>>()
{});
+
+ testStreamingEncode(encoder, map);
+ }
+
@Test
public void testNestArrayWithMapEncoder() {
Map<String, List<Map<RowEncoderTest.Foo, List<RowEncoderTest.Bar>>>> lmap
= new HashMap<>();
@@ -112,5 +189,7 @@ public class MapEncoderTest {
Map<String, List<Map<RowEncoderTest.Foo, List<RowEncoderTest.Bar>>>>
decodeMap =
encoder.decode(bytes);
Assert.assertEquals(decodeMap.size(), 10);
+
+ testStreamingEncode(encoder, lmap);
}
}
diff --git
a/java/fury-format/src/test/java/org/apache/fury/format/encoder/RowEncoderTest.java
b/java/fury-format/src/test/java/org/apache/fury/format/encoder/RowEncoderTest.java
index 973804d5..9a16954b 100644
---
a/java/fury-format/src/test/java/org/apache/fury/format/encoder/RowEncoderTest.java
+++
b/java/fury-format/src/test/java/org/apache/fury/format/encoder/RowEncoderTest.java
@@ -19,6 +19,8 @@
package org.apache.fury.format.encoder;
+import static
org.apache.fury.format.encoder.CodecBuilderTest.testStreamingEncode;
+
import com.google.common.collect.ImmutableMap;
import java.util.Arrays;
import java.util.HashMap;
@@ -41,21 +43,23 @@ public class RowEncoderTest {
Encoders.bean(AtomicLong.class);
{
RowEncoder<BeanA> encoder = Encoders.bean(BeanA.class);
+ BeanA beanA = BeanA.createBeanA(2);
for (int i = 0; i < 3; i++) {
- BeanA beanA = BeanA.createBeanA(2);
BinaryRow row = encoder.toRow(beanA);
BeanA newBean = encoder.fromRow(row);
Assert.assertEquals(beanA, newBean);
}
+ testStreamingEncode(encoder, beanA);
}
{
RowEncoder<BeanB> encoder = Encoders.bean(BeanB.class);
+ BeanB beanB = BeanB.createBeanB(2);
for (int i = 0; i < 3; i++) {
- BeanB beanB = BeanB.createBeanB(2);
BinaryRow row = encoder.toRow(beanB);
BeanB newBean = encoder.fromRow(row);
Assert.assertEquals(beanB, newBean);
}
+ testStreamingEncode(encoder, beanB);
}
}
diff --git
a/java/fury-format/src/main/java/org/apache/fury/format/encoder/Encoder.java
b/java/fury-test-core/src/main/java/org/apache/fury/test/bean/SimpleFoo.java
similarity index 53%
copy from
java/fury-format/src/main/java/org/apache/fury/format/encoder/Encoder.java
copy to
java/fury-test-core/src/main/java/org/apache/fury/test/bean/SimpleFoo.java
index 3e0d96c0..5e760751 100644
--- a/java/fury-format/src/main/java/org/apache/fury/format/encoder/Encoder.java
+++ b/java/fury-test-core/src/main/java/org/apache/fury/test/bean/SimpleFoo.java
@@ -17,18 +17,40 @@
* under the License.
*/
-package org.apache.fury.format.encoder;
+package org.apache.fury.test.bean;
-/**
- * The encoding interface for encode/decode object to/from binary. The
implementation class must
- * have a constructor with signature {@code Object[] references}, so we can
pass any params to
- * codec.
- *
- * @param <T> type of value
- */
-public interface Encoder<T> {
+import java.util.Objects;
+
+public class SimpleFoo {
+ public int f1;
+ public String f2;
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SimpleFoo foo = (SimpleFoo) o;
+ return f1 == foo.f1 && Objects.equals(f2, foo.f2);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(f1, f2);
+ }
- T decode(byte[] bytes);
+ @Override
+ public String toString() {
+ return "SimpleFoo{" + "f1=" + f1 + ", f2='" + f2 + '\'' + '}';
+ }
- byte[] encode(T obj);
+ public static SimpleFoo create() {
+ SimpleFoo foo = new SimpleFoo();
+ foo.f1 = 10;
+ foo.f2 = "str";
+ return foo;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]