Jefffrey commented on code in PR #18754:
URL: https://github.com/apache/datafusion/pull/18754#discussion_r2532495133
##########
datafusion/functions/src/encoding/inner.rs:
##########
@@ -232,324 +210,286 @@ impl ScalarUDFImpl for DecodeFunc {
}
}
-#[derive(Debug, Copy, Clone)]
-enum Encoding {
- Base64,
- Hex,
-}
-
-fn encode_process(value: &ColumnarValue, encoding: Encoding) ->
Result<ColumnarValue> {
+fn encode_scalar(value: &ScalarValue, encoding: Encoding) ->
Result<ColumnarValue> {
match value {
- ColumnarValue::Array(a) => match a.data_type() {
- DataType::Utf8 => encoding.encode_utf8_array::<i32>(a.as_ref()),
- DataType::LargeUtf8 =>
encoding.encode_utf8_array::<i64>(a.as_ref()),
- DataType::Utf8View =>
encoding.encode_utf8_array::<i32>(a.as_ref()),
- DataType::Binary =>
encoding.encode_binary_array::<i32>(a.as_ref()),
- DataType::LargeBinary =>
encoding.encode_binary_array::<i64>(a.as_ref()),
- other => exec_err!(
- "Unsupported data type {other:?} for function
encode({encoding})"
- ),
- },
- ColumnarValue::Scalar(scalar) => {
- match scalar {
- ScalarValue::Utf8(a) => {
- Ok(encoding.encode_scalar(a.as_ref().map(|s: &String|
s.as_bytes())))
- }
- ScalarValue::LargeUtf8(a) => Ok(encoding
- .encode_large_scalar(a.as_ref().map(|s: &String|
s.as_bytes()))),
- ScalarValue::Utf8View(a) => {
- Ok(encoding.encode_scalar(a.as_ref().map(|s: &String|
s.as_bytes())))
- }
- ScalarValue::Binary(a) => Ok(
- encoding.encode_scalar(a.as_ref().map(|v: &Vec<u8>|
v.as_slice()))
- ),
- ScalarValue::LargeBinary(a) => Ok(encoding
- .encode_large_scalar(a.as_ref().map(|v: &Vec<u8>|
v.as_slice()))),
- other => exec_err!(
- "Unsupported data type {other:?} for function
encode({encoding})"
- ),
- }
+ ScalarValue::Binary(maybe_bytes) |
ScalarValue::BinaryView(maybe_bytes) => {
+ Ok(ColumnarValue::Scalar(ScalarValue::Utf8(
+ maybe_bytes
+ .as_ref()
+ .map(|bytes| encoding.encode_bytes(bytes)),
+ )))
}
- }
-}
-
-fn decode_process(value: &ColumnarValue, encoding: Encoding) ->
Result<ColumnarValue> {
- match value {
- ColumnarValue::Array(a) => match a.data_type() {
- DataType::Utf8 => encoding.decode_utf8_array::<i32>(a.as_ref()),
- DataType::LargeUtf8 =>
encoding.decode_utf8_array::<i64>(a.as_ref()),
- DataType::Utf8View =>
encoding.decode_utf8_array::<i32>(a.as_ref()),
- DataType::Binary =>
encoding.decode_binary_array::<i32>(a.as_ref()),
- DataType::LargeBinary =>
encoding.decode_binary_array::<i64>(a.as_ref()),
- other => exec_err!(
- "Unsupported data type {other:?} for function
decode({encoding})"
- ),
- },
- ColumnarValue::Scalar(scalar) => {
- match scalar {
- ScalarValue::Utf8(a) => {
- encoding.decode_scalar(a.as_ref().map(|s: &String|
s.as_bytes()))
- }
- ScalarValue::LargeUtf8(a) => encoding
- .decode_large_scalar(a.as_ref().map(|s: &String|
s.as_bytes())),
- ScalarValue::Utf8View(a) => {
- encoding.decode_scalar(a.as_ref().map(|s: &String|
s.as_bytes()))
- }
- ScalarValue::Binary(a) => {
- encoding.decode_scalar(a.as_ref().map(|v: &Vec<u8>|
v.as_slice()))
- }
- ScalarValue::LargeBinary(a) => encoding
- .decode_large_scalar(a.as_ref().map(|v: &Vec<u8>|
v.as_slice())),
- other => exec_err!(
- "Unsupported data type {other:?} for function
decode({encoding})"
- ),
- }
+ ScalarValue::LargeBinary(maybe_bytes) => {
+ Ok(ColumnarValue::Scalar(ScalarValue::LargeUtf8(
+ maybe_bytes
+ .as_ref()
+ .map(|bytes| encoding.encode_bytes(bytes)),
+ )))
}
+ v => internal_err!("Unexpected value for encode: {v}"),
}
}
-fn hex_encode(input: &[u8]) -> String {
- hex::encode(input)
+fn encode_array(array: &ArrayRef, encoding: Encoding) -> Result<ColumnarValue>
{
Review Comment:
These `encode_scalar`, `encode_array`, `decode_scalar`, `decode_array`
methods should now be simpler as we've removed consideration of string types
(expect signature to coerce them to binary for us)
##########
datafusion/functions/src/encoding/inner.rs:
##########
@@ -79,7 +81,17 @@ impl Default for EncodeFunc {
impl EncodeFunc {
pub fn new() -> Self {
Self {
- signature: Signature::user_defined(Volatility::Immutable),
+ signature: Signature::coercible(
Review Comment:
Signature change here; same as for decode
##########
datafusion/functions/src/encoding/inner.rs:
##########
@@ -232,324 +210,286 @@ impl ScalarUDFImpl for DecodeFunc {
}
}
-#[derive(Debug, Copy, Clone)]
-enum Encoding {
- Base64,
- Hex,
-}
-
-fn encode_process(value: &ColumnarValue, encoding: Encoding) ->
Result<ColumnarValue> {
+fn encode_scalar(value: &ScalarValue, encoding: Encoding) ->
Result<ColumnarValue> {
match value {
- ColumnarValue::Array(a) => match a.data_type() {
- DataType::Utf8 => encoding.encode_utf8_array::<i32>(a.as_ref()),
- DataType::LargeUtf8 =>
encoding.encode_utf8_array::<i64>(a.as_ref()),
- DataType::Utf8View =>
encoding.encode_utf8_array::<i32>(a.as_ref()),
- DataType::Binary =>
encoding.encode_binary_array::<i32>(a.as_ref()),
- DataType::LargeBinary =>
encoding.encode_binary_array::<i64>(a.as_ref()),
- other => exec_err!(
- "Unsupported data type {other:?} for function
encode({encoding})"
- ),
- },
- ColumnarValue::Scalar(scalar) => {
- match scalar {
- ScalarValue::Utf8(a) => {
- Ok(encoding.encode_scalar(a.as_ref().map(|s: &String|
s.as_bytes())))
- }
- ScalarValue::LargeUtf8(a) => Ok(encoding
- .encode_large_scalar(a.as_ref().map(|s: &String|
s.as_bytes()))),
- ScalarValue::Utf8View(a) => {
- Ok(encoding.encode_scalar(a.as_ref().map(|s: &String|
s.as_bytes())))
- }
- ScalarValue::Binary(a) => Ok(
- encoding.encode_scalar(a.as_ref().map(|v: &Vec<u8>|
v.as_slice()))
- ),
- ScalarValue::LargeBinary(a) => Ok(encoding
- .encode_large_scalar(a.as_ref().map(|v: &Vec<u8>|
v.as_slice()))),
- other => exec_err!(
- "Unsupported data type {other:?} for function
encode({encoding})"
- ),
- }
+ ScalarValue::Binary(maybe_bytes) |
ScalarValue::BinaryView(maybe_bytes) => {
+ Ok(ColumnarValue::Scalar(ScalarValue::Utf8(
+ maybe_bytes
+ .as_ref()
+ .map(|bytes| encoding.encode_bytes(bytes)),
+ )))
}
- }
-}
-
-fn decode_process(value: &ColumnarValue, encoding: Encoding) ->
Result<ColumnarValue> {
- match value {
- ColumnarValue::Array(a) => match a.data_type() {
- DataType::Utf8 => encoding.decode_utf8_array::<i32>(a.as_ref()),
- DataType::LargeUtf8 =>
encoding.decode_utf8_array::<i64>(a.as_ref()),
- DataType::Utf8View =>
encoding.decode_utf8_array::<i32>(a.as_ref()),
- DataType::Binary =>
encoding.decode_binary_array::<i32>(a.as_ref()),
- DataType::LargeBinary =>
encoding.decode_binary_array::<i64>(a.as_ref()),
- other => exec_err!(
- "Unsupported data type {other:?} for function
decode({encoding})"
- ),
- },
- ColumnarValue::Scalar(scalar) => {
- match scalar {
- ScalarValue::Utf8(a) => {
- encoding.decode_scalar(a.as_ref().map(|s: &String|
s.as_bytes()))
- }
- ScalarValue::LargeUtf8(a) => encoding
- .decode_large_scalar(a.as_ref().map(|s: &String|
s.as_bytes())),
- ScalarValue::Utf8View(a) => {
- encoding.decode_scalar(a.as_ref().map(|s: &String|
s.as_bytes()))
- }
- ScalarValue::Binary(a) => {
- encoding.decode_scalar(a.as_ref().map(|v: &Vec<u8>|
v.as_slice()))
- }
- ScalarValue::LargeBinary(a) => encoding
- .decode_large_scalar(a.as_ref().map(|v: &Vec<u8>|
v.as_slice())),
- other => exec_err!(
- "Unsupported data type {other:?} for function
decode({encoding})"
- ),
- }
+ ScalarValue::LargeBinary(maybe_bytes) => {
+ Ok(ColumnarValue::Scalar(ScalarValue::LargeUtf8(
+ maybe_bytes
+ .as_ref()
+ .map(|bytes| encoding.encode_bytes(bytes)),
+ )))
}
+ v => internal_err!("Unexpected value for encode: {v}"),
}
}
-fn hex_encode(input: &[u8]) -> String {
- hex::encode(input)
+fn encode_array(array: &ArrayRef, encoding: Encoding) -> Result<ColumnarValue>
{
+ let array = match array.data_type() {
+ DataType::Binary => encoding.encode_array::<_,
i32>(&array.as_binary::<i32>()),
+ DataType::BinaryView => encoding.encode_array::<_,
i32>(&array.as_binary_view()),
+ DataType::LargeBinary => {
+ encoding.encode_array::<_, i64>(&array.as_binary::<i64>())
+ }
+ dt => {
+ internal_err!("Unexpected data type for encode: {dt}")
+ }
+ };
+ array.map(ColumnarValue::Array)
}
-fn base64_encode(input: &[u8]) -> String {
- BASE64_ENGINE.encode(input)
+fn decode_scalar(value: &ScalarValue, encoding: Encoding) ->
Result<ColumnarValue> {
+ match value {
+ ScalarValue::Binary(maybe_bytes) |
ScalarValue::BinaryView(maybe_bytes) => {
+ Ok(ColumnarValue::Scalar(ScalarValue::Binary(
+ maybe_bytes
+ .as_ref()
+ .map(|x| encoding.decode_bytes(x))
+ .transpose()?,
+ )))
+ }
+ ScalarValue::LargeBinary(maybe_bytes) => {
+ Ok(ColumnarValue::Scalar(ScalarValue::LargeBinary(
+ maybe_bytes
+ .as_ref()
+ .map(|x| encoding.decode_bytes(x))
+ .transpose()?,
+ )))
+ }
+ v => internal_err!("Unexpected value for decode: {v}"),
+ }
}
-fn hex_decode(input: &[u8], buf: &mut [u8]) -> Result<usize> {
- // only write input / 2 bytes to buf
- let out_len = input.len() / 2;
- let buf = &mut buf[..out_len];
- hex::decode_to_slice(input, buf)
- .map_err(|e| internal_datafusion_err!("Failed to decode from hex:
{e}"))?;
- Ok(out_len)
+/// Estimate how many bytes are actually represented by the array; in case the
+/// the array slices it's internal buffer, this returns the byte size of that
slice
+/// but not the byte size of the entire buffer.
+///
+/// This is an estimation only as it can estimate higher if null slots are
non-zero
+/// sized.
+fn estimate_byte_data_size<O: OffsetSizeTrait>(array: &GenericBinaryArray<O>)
-> usize {
+ let offsets = array.value_offsets();
+ // Unwraps are safe as should always have 1 element in offset buffer
+ let start = *offsets.first().unwrap();
+ let end = *offsets.last().unwrap();
+ let data_size = end - start;
+ data_size.as_usize()
}
-fn base64_decode(input: &[u8], buf: &mut [u8]) -> Result<usize> {
- BASE64_ENGINE
- .decode_slice(input, buf)
- .map_err(|e| internal_datafusion_err!("Failed to decode from base64:
{e}"))
+fn decode_array(array: &ArrayRef, encoding: Encoding) -> Result<ColumnarValue>
{
+ let array = match array.data_type() {
+ DataType::Binary => {
+ let array = array.as_binary::<i32>();
+ encoding.decode_array::<_, i32>(&array,
estimate_byte_data_size(array))
+ }
+ DataType::BinaryView => {
+ let array = array.as_binary_view();
+ // Don't know if there is a more strict upper bound we can infer
+ // for view arrays byte data size.
+ encoding.decode_array::<_, i32>(&array,
array.get_buffer_memory_size())
+ }
+ DataType::LargeBinary => {
+ let array = array.as_binary::<i64>();
+ encoding.decode_array::<_, i64>(&array,
estimate_byte_data_size(array))
+ }
+ dt => {
+ internal_err!("Unexpected data type for decode: {dt}")
+ }
+ };
+ array.map(ColumnarValue::Array)
}
-macro_rules! encode_to_array {
- ($METHOD: ident, $INPUT:expr) => {{
- let utf8_array: StringArray = $INPUT
- .iter()
- .map(|x| x.map(|x| $METHOD(x.as_ref())))
- .collect();
- Arc::new(utf8_array)
- }};
+#[derive(Debug, Copy, Clone)]
+enum Encoding {
+ Base64,
+ Hex,
}
-fn decode_to_array<F, T: ByteArrayType>(
- method: F,
- input: &GenericByteArray<T>,
- conservative_upper_bound_size: usize,
-) -> Result<ArrayRef>
-where
- F: Fn(&[u8], &mut [u8]) -> Result<usize>,
-{
- let mut values = vec![0; conservative_upper_bound_size];
- let mut offsets = OffsetBufferBuilder::new(input.len());
- let mut total_bytes_decoded = 0;
- for v in input {
- if let Some(v) = v {
- let cursor = &mut values[total_bytes_decoded..];
- let decoded = method(v.as_ref(), cursor)?;
- total_bytes_decoded += decoded;
- offsets.push_length(decoded);
- } else {
- offsets.push_length(0);
- }
+impl fmt::Display for Encoding {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ write!(f, "{}", format!("{self:?}").to_lowercase())
}
- // We reserved an upper bound size for the values buffer, but we only use
the actual size
- values.truncate(total_bytes_decoded);
- let binary_array = BinaryArray::try_new(
- offsets.finish(),
- Buffer::from_vec(values),
- input.nulls().cloned(),
- )?;
- Ok(Arc::new(binary_array))
}
-impl Encoding {
- fn encode_scalar(self, value: Option<&[u8]>) -> ColumnarValue {
- ColumnarValue::Scalar(match self {
- Self::Base64 => ScalarValue::Utf8(value.map(|v|
BASE64_ENGINE.encode(v))),
- Self::Hex => ScalarValue::Utf8(value.map(hex::encode)),
- })
- }
-
- fn encode_large_scalar(self, value: Option<&[u8]>) -> ColumnarValue {
- ColumnarValue::Scalar(match self {
- Self::Base64 => {
- ScalarValue::LargeUtf8(value.map(|v| BASE64_ENGINE.encode(v)))
+impl TryFrom<&ColumnarValue> for Encoding {
+ type Error = DataFusionError;
+
+ fn try_from(encoding: &ColumnarValue) -> Result<Self> {
Review Comment:
I merge the `FromStr` into this, since `ColumnarValue`s are pretty much the
main thing we care about parsing encoding from (no need for a two step of
extracting string from `ColumnarValue` then parsing that string; just
consolidate into single From)
##########
datafusion/functions/src/encoding/inner.rs:
##########
@@ -232,324 +210,286 @@ impl ScalarUDFImpl for DecodeFunc {
}
}
-#[derive(Debug, Copy, Clone)]
-enum Encoding {
- Base64,
- Hex,
-}
-
-fn encode_process(value: &ColumnarValue, encoding: Encoding) ->
Result<ColumnarValue> {
+fn encode_scalar(value: &ScalarValue, encoding: Encoding) ->
Result<ColumnarValue> {
match value {
- ColumnarValue::Array(a) => match a.data_type() {
- DataType::Utf8 => encoding.encode_utf8_array::<i32>(a.as_ref()),
- DataType::LargeUtf8 =>
encoding.encode_utf8_array::<i64>(a.as_ref()),
- DataType::Utf8View =>
encoding.encode_utf8_array::<i32>(a.as_ref()),
- DataType::Binary =>
encoding.encode_binary_array::<i32>(a.as_ref()),
- DataType::LargeBinary =>
encoding.encode_binary_array::<i64>(a.as_ref()),
- other => exec_err!(
- "Unsupported data type {other:?} for function
encode({encoding})"
- ),
- },
- ColumnarValue::Scalar(scalar) => {
- match scalar {
- ScalarValue::Utf8(a) => {
- Ok(encoding.encode_scalar(a.as_ref().map(|s: &String|
s.as_bytes())))
- }
- ScalarValue::LargeUtf8(a) => Ok(encoding
- .encode_large_scalar(a.as_ref().map(|s: &String|
s.as_bytes()))),
- ScalarValue::Utf8View(a) => {
- Ok(encoding.encode_scalar(a.as_ref().map(|s: &String|
s.as_bytes())))
- }
- ScalarValue::Binary(a) => Ok(
- encoding.encode_scalar(a.as_ref().map(|v: &Vec<u8>|
v.as_slice()))
- ),
- ScalarValue::LargeBinary(a) => Ok(encoding
- .encode_large_scalar(a.as_ref().map(|v: &Vec<u8>|
v.as_slice()))),
- other => exec_err!(
- "Unsupported data type {other:?} for function
encode({encoding})"
- ),
- }
+ ScalarValue::Binary(maybe_bytes) |
ScalarValue::BinaryView(maybe_bytes) => {
+ Ok(ColumnarValue::Scalar(ScalarValue::Utf8(
+ maybe_bytes
+ .as_ref()
+ .map(|bytes| encoding.encode_bytes(bytes)),
+ )))
}
- }
-}
-
-fn decode_process(value: &ColumnarValue, encoding: Encoding) ->
Result<ColumnarValue> {
- match value {
- ColumnarValue::Array(a) => match a.data_type() {
- DataType::Utf8 => encoding.decode_utf8_array::<i32>(a.as_ref()),
- DataType::LargeUtf8 =>
encoding.decode_utf8_array::<i64>(a.as_ref()),
- DataType::Utf8View =>
encoding.decode_utf8_array::<i32>(a.as_ref()),
- DataType::Binary =>
encoding.decode_binary_array::<i32>(a.as_ref()),
- DataType::LargeBinary =>
encoding.decode_binary_array::<i64>(a.as_ref()),
- other => exec_err!(
- "Unsupported data type {other:?} for function
decode({encoding})"
- ),
- },
- ColumnarValue::Scalar(scalar) => {
- match scalar {
- ScalarValue::Utf8(a) => {
- encoding.decode_scalar(a.as_ref().map(|s: &String|
s.as_bytes()))
- }
- ScalarValue::LargeUtf8(a) => encoding
- .decode_large_scalar(a.as_ref().map(|s: &String|
s.as_bytes())),
- ScalarValue::Utf8View(a) => {
- encoding.decode_scalar(a.as_ref().map(|s: &String|
s.as_bytes()))
- }
- ScalarValue::Binary(a) => {
- encoding.decode_scalar(a.as_ref().map(|v: &Vec<u8>|
v.as_slice()))
- }
- ScalarValue::LargeBinary(a) => encoding
- .decode_large_scalar(a.as_ref().map(|v: &Vec<u8>|
v.as_slice())),
- other => exec_err!(
- "Unsupported data type {other:?} for function
decode({encoding})"
- ),
- }
+ ScalarValue::LargeBinary(maybe_bytes) => {
+ Ok(ColumnarValue::Scalar(ScalarValue::LargeUtf8(
+ maybe_bytes
+ .as_ref()
+ .map(|bytes| encoding.encode_bytes(bytes)),
+ )))
}
+ v => internal_err!("Unexpected value for encode: {v}"),
}
}
-fn hex_encode(input: &[u8]) -> String {
- hex::encode(input)
+fn encode_array(array: &ArrayRef, encoding: Encoding) -> Result<ColumnarValue>
{
+ let array = match array.data_type() {
+ DataType::Binary => encoding.encode_array::<_,
i32>(&array.as_binary::<i32>()),
+ DataType::BinaryView => encoding.encode_array::<_,
i32>(&array.as_binary_view()),
+ DataType::LargeBinary => {
+ encoding.encode_array::<_, i64>(&array.as_binary::<i64>())
+ }
+ dt => {
+ internal_err!("Unexpected data type for encode: {dt}")
+ }
+ };
+ array.map(ColumnarValue::Array)
}
-fn base64_encode(input: &[u8]) -> String {
- BASE64_ENGINE.encode(input)
+fn decode_scalar(value: &ScalarValue, encoding: Encoding) ->
Result<ColumnarValue> {
+ match value {
+ ScalarValue::Binary(maybe_bytes) |
ScalarValue::BinaryView(maybe_bytes) => {
+ Ok(ColumnarValue::Scalar(ScalarValue::Binary(
+ maybe_bytes
+ .as_ref()
+ .map(|x| encoding.decode_bytes(x))
+ .transpose()?,
+ )))
+ }
+ ScalarValue::LargeBinary(maybe_bytes) => {
+ Ok(ColumnarValue::Scalar(ScalarValue::LargeBinary(
+ maybe_bytes
+ .as_ref()
+ .map(|x| encoding.decode_bytes(x))
+ .transpose()?,
+ )))
+ }
+ v => internal_err!("Unexpected value for decode: {v}"),
+ }
}
-fn hex_decode(input: &[u8], buf: &mut [u8]) -> Result<usize> {
- // only write input / 2 bytes to buf
- let out_len = input.len() / 2;
- let buf = &mut buf[..out_len];
- hex::decode_to_slice(input, buf)
- .map_err(|e| internal_datafusion_err!("Failed to decode from hex:
{e}"))?;
- Ok(out_len)
+/// Estimate how many bytes are actually represented by the array; in case the
+/// the array slices it's internal buffer, this returns the byte size of that
slice
+/// but not the byte size of the entire buffer.
+///
+/// This is an estimation only as it can estimate higher if null slots are
non-zero
+/// sized.
+fn estimate_byte_data_size<O: OffsetSizeTrait>(array: &GenericBinaryArray<O>)
-> usize {
+ let offsets = array.value_offsets();
+ // Unwraps are safe as should always have 1 element in offset buffer
+ let start = *offsets.first().unwrap();
+ let end = *offsets.last().unwrap();
+ let data_size = end - start;
+ data_size.as_usize()
}
-fn base64_decode(input: &[u8], buf: &mut [u8]) -> Result<usize> {
- BASE64_ENGINE
- .decode_slice(input, buf)
- .map_err(|e| internal_datafusion_err!("Failed to decode from base64:
{e}"))
+fn decode_array(array: &ArrayRef, encoding: Encoding) -> Result<ColumnarValue>
{
+ let array = match array.data_type() {
+ DataType::Binary => {
+ let array = array.as_binary::<i32>();
+ encoding.decode_array::<_, i32>(&array,
estimate_byte_data_size(array))
+ }
+ DataType::BinaryView => {
+ let array = array.as_binary_view();
+ // Don't know if there is a more strict upper bound we can infer
+ // for view arrays byte data size.
+ encoding.decode_array::<_, i32>(&array,
array.get_buffer_memory_size())
Review Comment:
For views I don't know if we can do a better estimation of how many actual
bytes are represented by the array
##########
datafusion/functions/src/encoding/inner.rs:
##########
@@ -232,324 +210,286 @@ impl ScalarUDFImpl for DecodeFunc {
}
}
-#[derive(Debug, Copy, Clone)]
-enum Encoding {
- Base64,
- Hex,
-}
-
-fn encode_process(value: &ColumnarValue, encoding: Encoding) ->
Result<ColumnarValue> {
+fn encode_scalar(value: &ScalarValue, encoding: Encoding) ->
Result<ColumnarValue> {
match value {
- ColumnarValue::Array(a) => match a.data_type() {
- DataType::Utf8 => encoding.encode_utf8_array::<i32>(a.as_ref()),
- DataType::LargeUtf8 =>
encoding.encode_utf8_array::<i64>(a.as_ref()),
- DataType::Utf8View =>
encoding.encode_utf8_array::<i32>(a.as_ref()),
- DataType::Binary =>
encoding.encode_binary_array::<i32>(a.as_ref()),
- DataType::LargeBinary =>
encoding.encode_binary_array::<i64>(a.as_ref()),
- other => exec_err!(
- "Unsupported data type {other:?} for function
encode({encoding})"
- ),
- },
- ColumnarValue::Scalar(scalar) => {
- match scalar {
- ScalarValue::Utf8(a) => {
- Ok(encoding.encode_scalar(a.as_ref().map(|s: &String|
s.as_bytes())))
- }
- ScalarValue::LargeUtf8(a) => Ok(encoding
- .encode_large_scalar(a.as_ref().map(|s: &String|
s.as_bytes()))),
- ScalarValue::Utf8View(a) => {
- Ok(encoding.encode_scalar(a.as_ref().map(|s: &String|
s.as_bytes())))
- }
- ScalarValue::Binary(a) => Ok(
- encoding.encode_scalar(a.as_ref().map(|v: &Vec<u8>|
v.as_slice()))
- ),
- ScalarValue::LargeBinary(a) => Ok(encoding
- .encode_large_scalar(a.as_ref().map(|v: &Vec<u8>|
v.as_slice()))),
- other => exec_err!(
- "Unsupported data type {other:?} for function
encode({encoding})"
- ),
- }
+ ScalarValue::Binary(maybe_bytes) |
ScalarValue::BinaryView(maybe_bytes) => {
+ Ok(ColumnarValue::Scalar(ScalarValue::Utf8(
+ maybe_bytes
+ .as_ref()
+ .map(|bytes| encoding.encode_bytes(bytes)),
+ )))
}
- }
-}
-
-fn decode_process(value: &ColumnarValue, encoding: Encoding) ->
Result<ColumnarValue> {
- match value {
- ColumnarValue::Array(a) => match a.data_type() {
- DataType::Utf8 => encoding.decode_utf8_array::<i32>(a.as_ref()),
- DataType::LargeUtf8 =>
encoding.decode_utf8_array::<i64>(a.as_ref()),
- DataType::Utf8View =>
encoding.decode_utf8_array::<i32>(a.as_ref()),
- DataType::Binary =>
encoding.decode_binary_array::<i32>(a.as_ref()),
- DataType::LargeBinary =>
encoding.decode_binary_array::<i64>(a.as_ref()),
- other => exec_err!(
- "Unsupported data type {other:?} for function
decode({encoding})"
- ),
- },
- ColumnarValue::Scalar(scalar) => {
- match scalar {
- ScalarValue::Utf8(a) => {
- encoding.decode_scalar(a.as_ref().map(|s: &String|
s.as_bytes()))
- }
- ScalarValue::LargeUtf8(a) => encoding
- .decode_large_scalar(a.as_ref().map(|s: &String|
s.as_bytes())),
- ScalarValue::Utf8View(a) => {
- encoding.decode_scalar(a.as_ref().map(|s: &String|
s.as_bytes()))
- }
- ScalarValue::Binary(a) => {
- encoding.decode_scalar(a.as_ref().map(|v: &Vec<u8>|
v.as_slice()))
- }
- ScalarValue::LargeBinary(a) => encoding
- .decode_large_scalar(a.as_ref().map(|v: &Vec<u8>|
v.as_slice())),
- other => exec_err!(
- "Unsupported data type {other:?} for function
decode({encoding})"
- ),
- }
+ ScalarValue::LargeBinary(maybe_bytes) => {
+ Ok(ColumnarValue::Scalar(ScalarValue::LargeUtf8(
+ maybe_bytes
+ .as_ref()
+ .map(|bytes| encoding.encode_bytes(bytes)),
+ )))
}
+ v => internal_err!("Unexpected value for encode: {v}"),
}
}
-fn hex_encode(input: &[u8]) -> String {
- hex::encode(input)
+fn encode_array(array: &ArrayRef, encoding: Encoding) -> Result<ColumnarValue>
{
+ let array = match array.data_type() {
+ DataType::Binary => encoding.encode_array::<_,
i32>(&array.as_binary::<i32>()),
+ DataType::BinaryView => encoding.encode_array::<_,
i32>(&array.as_binary_view()),
+ DataType::LargeBinary => {
+ encoding.encode_array::<_, i64>(&array.as_binary::<i64>())
+ }
+ dt => {
+ internal_err!("Unexpected data type for encode: {dt}")
+ }
+ };
+ array.map(ColumnarValue::Array)
}
-fn base64_encode(input: &[u8]) -> String {
- BASE64_ENGINE.encode(input)
+fn decode_scalar(value: &ScalarValue, encoding: Encoding) ->
Result<ColumnarValue> {
+ match value {
+ ScalarValue::Binary(maybe_bytes) |
ScalarValue::BinaryView(maybe_bytes) => {
+ Ok(ColumnarValue::Scalar(ScalarValue::Binary(
+ maybe_bytes
+ .as_ref()
+ .map(|x| encoding.decode_bytes(x))
+ .transpose()?,
+ )))
+ }
+ ScalarValue::LargeBinary(maybe_bytes) => {
+ Ok(ColumnarValue::Scalar(ScalarValue::LargeBinary(
+ maybe_bytes
+ .as_ref()
+ .map(|x| encoding.decode_bytes(x))
+ .transpose()?,
+ )))
+ }
+ v => internal_err!("Unexpected value for decode: {v}"),
+ }
}
-fn hex_decode(input: &[u8], buf: &mut [u8]) -> Result<usize> {
- // only write input / 2 bytes to buf
- let out_len = input.len() / 2;
- let buf = &mut buf[..out_len];
- hex::decode_to_slice(input, buf)
- .map_err(|e| internal_datafusion_err!("Failed to decode from hex:
{e}"))?;
- Ok(out_len)
+/// Estimate how many bytes are actually represented by the array; in case the
+/// the array slices it's internal buffer, this returns the byte size of that
slice
+/// but not the byte size of the entire buffer.
+///
+/// This is an estimation only as it can estimate higher if null slots are
non-zero
+/// sized.
+fn estimate_byte_data_size<O: OffsetSizeTrait>(array: &GenericBinaryArray<O>)
-> usize {
+ let offsets = array.value_offsets();
+ // Unwraps are safe as should always have 1 element in offset buffer
+ let start = *offsets.first().unwrap();
+ let end = *offsets.last().unwrap();
+ let data_size = end - start;
+ data_size.as_usize()
}
Review Comment:
Previously we just took the length of the values buffers; this estimation
should be more conservative in the best case, and worst case we'd just have the
same estimation as before
##########
datafusion/functions/src/encoding/inner.rs:
##########
@@ -232,324 +210,286 @@ impl ScalarUDFImpl for DecodeFunc {
}
}
-#[derive(Debug, Copy, Clone)]
-enum Encoding {
- Base64,
- Hex,
-}
-
-fn encode_process(value: &ColumnarValue, encoding: Encoding) ->
Result<ColumnarValue> {
+fn encode_scalar(value: &ScalarValue, encoding: Encoding) ->
Result<ColumnarValue> {
match value {
- ColumnarValue::Array(a) => match a.data_type() {
- DataType::Utf8 => encoding.encode_utf8_array::<i32>(a.as_ref()),
- DataType::LargeUtf8 =>
encoding.encode_utf8_array::<i64>(a.as_ref()),
- DataType::Utf8View =>
encoding.encode_utf8_array::<i32>(a.as_ref()),
- DataType::Binary =>
encoding.encode_binary_array::<i32>(a.as_ref()),
- DataType::LargeBinary =>
encoding.encode_binary_array::<i64>(a.as_ref()),
- other => exec_err!(
- "Unsupported data type {other:?} for function
encode({encoding})"
- ),
- },
- ColumnarValue::Scalar(scalar) => {
- match scalar {
- ScalarValue::Utf8(a) => {
- Ok(encoding.encode_scalar(a.as_ref().map(|s: &String|
s.as_bytes())))
- }
- ScalarValue::LargeUtf8(a) => Ok(encoding
- .encode_large_scalar(a.as_ref().map(|s: &String|
s.as_bytes()))),
- ScalarValue::Utf8View(a) => {
- Ok(encoding.encode_scalar(a.as_ref().map(|s: &String|
s.as_bytes())))
- }
- ScalarValue::Binary(a) => Ok(
- encoding.encode_scalar(a.as_ref().map(|v: &Vec<u8>|
v.as_slice()))
- ),
- ScalarValue::LargeBinary(a) => Ok(encoding
- .encode_large_scalar(a.as_ref().map(|v: &Vec<u8>|
v.as_slice()))),
- other => exec_err!(
- "Unsupported data type {other:?} for function
encode({encoding})"
- ),
- }
+ ScalarValue::Binary(maybe_bytes) |
ScalarValue::BinaryView(maybe_bytes) => {
+ Ok(ColumnarValue::Scalar(ScalarValue::Utf8(
+ maybe_bytes
+ .as_ref()
+ .map(|bytes| encoding.encode_bytes(bytes)),
+ )))
}
- }
-}
-
-fn decode_process(value: &ColumnarValue, encoding: Encoding) ->
Result<ColumnarValue> {
- match value {
- ColumnarValue::Array(a) => match a.data_type() {
- DataType::Utf8 => encoding.decode_utf8_array::<i32>(a.as_ref()),
- DataType::LargeUtf8 =>
encoding.decode_utf8_array::<i64>(a.as_ref()),
- DataType::Utf8View =>
encoding.decode_utf8_array::<i32>(a.as_ref()),
- DataType::Binary =>
encoding.decode_binary_array::<i32>(a.as_ref()),
- DataType::LargeBinary =>
encoding.decode_binary_array::<i64>(a.as_ref()),
- other => exec_err!(
- "Unsupported data type {other:?} for function
decode({encoding})"
- ),
- },
- ColumnarValue::Scalar(scalar) => {
- match scalar {
- ScalarValue::Utf8(a) => {
- encoding.decode_scalar(a.as_ref().map(|s: &String|
s.as_bytes()))
- }
- ScalarValue::LargeUtf8(a) => encoding
- .decode_large_scalar(a.as_ref().map(|s: &String|
s.as_bytes())),
- ScalarValue::Utf8View(a) => {
- encoding.decode_scalar(a.as_ref().map(|s: &String|
s.as_bytes()))
- }
- ScalarValue::Binary(a) => {
- encoding.decode_scalar(a.as_ref().map(|v: &Vec<u8>|
v.as_slice()))
- }
- ScalarValue::LargeBinary(a) => encoding
- .decode_large_scalar(a.as_ref().map(|v: &Vec<u8>|
v.as_slice())),
- other => exec_err!(
- "Unsupported data type {other:?} for function
decode({encoding})"
- ),
- }
+ ScalarValue::LargeBinary(maybe_bytes) => {
+ Ok(ColumnarValue::Scalar(ScalarValue::LargeUtf8(
+ maybe_bytes
+ .as_ref()
+ .map(|bytes| encoding.encode_bytes(bytes)),
+ )))
}
+ v => internal_err!("Unexpected value for encode: {v}"),
}
}
-fn hex_encode(input: &[u8]) -> String {
- hex::encode(input)
+fn encode_array(array: &ArrayRef, encoding: Encoding) -> Result<ColumnarValue>
{
+ let array = match array.data_type() {
+ DataType::Binary => encoding.encode_array::<_,
i32>(&array.as_binary::<i32>()),
+ DataType::BinaryView => encoding.encode_array::<_,
i32>(&array.as_binary_view()),
+ DataType::LargeBinary => {
+ encoding.encode_array::<_, i64>(&array.as_binary::<i64>())
+ }
+ dt => {
+ internal_err!("Unexpected data type for encode: {dt}")
+ }
+ };
+ array.map(ColumnarValue::Array)
}
-fn base64_encode(input: &[u8]) -> String {
- BASE64_ENGINE.encode(input)
+fn decode_scalar(value: &ScalarValue, encoding: Encoding) ->
Result<ColumnarValue> {
+ match value {
+ ScalarValue::Binary(maybe_bytes) |
ScalarValue::BinaryView(maybe_bytes) => {
+ Ok(ColumnarValue::Scalar(ScalarValue::Binary(
+ maybe_bytes
+ .as_ref()
+ .map(|x| encoding.decode_bytes(x))
+ .transpose()?,
+ )))
+ }
+ ScalarValue::LargeBinary(maybe_bytes) => {
+ Ok(ColumnarValue::Scalar(ScalarValue::LargeBinary(
+ maybe_bytes
+ .as_ref()
+ .map(|x| encoding.decode_bytes(x))
+ .transpose()?,
+ )))
+ }
+ v => internal_err!("Unexpected value for decode: {v}"),
+ }
}
-fn hex_decode(input: &[u8], buf: &mut [u8]) -> Result<usize> {
- // only write input / 2 bytes to buf
- let out_len = input.len() / 2;
- let buf = &mut buf[..out_len];
- hex::decode_to_slice(input, buf)
- .map_err(|e| internal_datafusion_err!("Failed to decode from hex:
{e}"))?;
- Ok(out_len)
+/// Estimate how many bytes are actually represented by the array; in case the
+/// the array slices it's internal buffer, this returns the byte size of that
slice
+/// but not the byte size of the entire buffer.
+///
+/// This is an estimation only as it can estimate higher if null slots are
non-zero
+/// sized.
+fn estimate_byte_data_size<O: OffsetSizeTrait>(array: &GenericBinaryArray<O>)
-> usize {
+ let offsets = array.value_offsets();
+ // Unwraps are safe as should always have 1 element in offset buffer
+ let start = *offsets.first().unwrap();
+ let end = *offsets.last().unwrap();
+ let data_size = end - start;
+ data_size.as_usize()
}
-fn base64_decode(input: &[u8], buf: &mut [u8]) -> Result<usize> {
- BASE64_ENGINE
- .decode_slice(input, buf)
- .map_err(|e| internal_datafusion_err!("Failed to decode from base64:
{e}"))
+fn decode_array(array: &ArrayRef, encoding: Encoding) -> Result<ColumnarValue>
{
+ let array = match array.data_type() {
+ DataType::Binary => {
+ let array = array.as_binary::<i32>();
+ encoding.decode_array::<_, i32>(&array,
estimate_byte_data_size(array))
+ }
+ DataType::BinaryView => {
+ let array = array.as_binary_view();
+ // Don't know if there is a more strict upper bound we can infer
+ // for view arrays byte data size.
+ encoding.decode_array::<_, i32>(&array,
array.get_buffer_memory_size())
+ }
+ DataType::LargeBinary => {
+ let array = array.as_binary::<i64>();
+ encoding.decode_array::<_, i64>(&array,
estimate_byte_data_size(array))
+ }
+ dt => {
+ internal_err!("Unexpected data type for decode: {dt}")
+ }
+ };
+ array.map(ColumnarValue::Array)
}
-macro_rules! encode_to_array {
- ($METHOD: ident, $INPUT:expr) => {{
- let utf8_array: StringArray = $INPUT
- .iter()
- .map(|x| x.map(|x| $METHOD(x.as_ref())))
- .collect();
- Arc::new(utf8_array)
- }};
+#[derive(Debug, Copy, Clone)]
+enum Encoding {
+ Base64,
+ Hex,
}
-fn decode_to_array<F, T: ByteArrayType>(
- method: F,
- input: &GenericByteArray<T>,
- conservative_upper_bound_size: usize,
-) -> Result<ArrayRef>
-where
- F: Fn(&[u8], &mut [u8]) -> Result<usize>,
-{
- let mut values = vec![0; conservative_upper_bound_size];
- let mut offsets = OffsetBufferBuilder::new(input.len());
- let mut total_bytes_decoded = 0;
- for v in input {
- if let Some(v) = v {
- let cursor = &mut values[total_bytes_decoded..];
- let decoded = method(v.as_ref(), cursor)?;
- total_bytes_decoded += decoded;
- offsets.push_length(decoded);
- } else {
- offsets.push_length(0);
- }
+impl fmt::Display for Encoding {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ write!(f, "{}", format!("{self:?}").to_lowercase())
}
- // We reserved an upper bound size for the values buffer, but we only use
the actual size
- values.truncate(total_bytes_decoded);
- let binary_array = BinaryArray::try_new(
- offsets.finish(),
- Buffer::from_vec(values),
- input.nulls().cloned(),
- )?;
- Ok(Arc::new(binary_array))
}
-impl Encoding {
- fn encode_scalar(self, value: Option<&[u8]>) -> ColumnarValue {
- ColumnarValue::Scalar(match self {
- Self::Base64 => ScalarValue::Utf8(value.map(|v|
BASE64_ENGINE.encode(v))),
- Self::Hex => ScalarValue::Utf8(value.map(hex::encode)),
- })
- }
-
- fn encode_large_scalar(self, value: Option<&[u8]>) -> ColumnarValue {
- ColumnarValue::Scalar(match self {
- Self::Base64 => {
- ScalarValue::LargeUtf8(value.map(|v| BASE64_ENGINE.encode(v)))
+impl TryFrom<&ColumnarValue> for Encoding {
+ type Error = DataFusionError;
+
+ fn try_from(encoding: &ColumnarValue) -> Result<Self> {
+ let encoding = match encoding {
+ ColumnarValue::Scalar(encoding) => match
encoding.try_as_str().flatten() {
+ Some(encoding) => encoding,
+ _ => return exec_err!("Encoding must be a non-null string"),
+ },
+ ColumnarValue::Array(_) => {
+ return not_impl_err!("Encoding must be a scalar; array
specified encoding is not yet supported")
}
- Self::Hex => ScalarValue::LargeUtf8(value.map(hex::encode)),
- })
- }
-
- fn encode_binary_array<T>(self, value: &dyn Array) -> Result<ColumnarValue>
- where
- T: OffsetSizeTrait,
- {
- let input_value = as_generic_binary_array::<T>(value)?;
- let array: ArrayRef = match self {
- Self::Base64 => encode_to_array!(base64_encode, input_value),
- Self::Hex => encode_to_array!(hex_encode, input_value),
};
- Ok(ColumnarValue::Array(array))
- }
-
- fn encode_utf8_array<T>(self, value: &dyn Array) -> Result<ColumnarValue>
- where
- T: OffsetSizeTrait,
- {
- let input_value = as_generic_string_array::<T>(value)?;
- let array: ArrayRef = match self {
- Self::Base64 => encode_to_array!(base64_encode, input_value),
- Self::Hex => encode_to_array!(hex_encode, input_value),
- };
- Ok(ColumnarValue::Array(array))
+ match encoding {
+ "base64" => Ok(Self::Base64),
+ "hex" => Ok(Self::Hex),
+ _ => {
+ let options = [Self::Base64, Self::Hex]
+ .iter()
+ .map(|i| i.to_string())
+ .collect::<Vec<_>>()
+ .join(", ");
+ plan_err!(
+ "There is no built-in encoding named '{encoding}',
currently supported encodings are: {options}"
+ )
+ }
+ }
}
+}
- fn decode_scalar(self, value: Option<&[u8]>) -> Result<ColumnarValue> {
- let value = match value {
- Some(value) => value,
- None => return
Ok(ColumnarValue::Scalar(ScalarValue::Binary(None))),
- };
-
- let out = match self {
- Self::Base64 => BASE64_ENGINE.decode(value).map_err(|e| {
- internal_datafusion_err!("Failed to decode value using base64:
{e}")
- })?,
- Self::Hex => hex::decode(value).map_err(|e| {
- internal_datafusion_err!("Failed to decode value using hex:
{e}")
- })?,
- };
-
- Ok(ColumnarValue::Scalar(ScalarValue::Binary(Some(out))))
+impl Encoding {
+ fn encode_bytes(self, value: &[u8]) -> String {
+ match self {
+ Self::Base64 => BASE64_ENGINE.encode(value),
+ Self::Hex => hex::encode(value),
+ }
}
- fn decode_large_scalar(self, value: Option<&[u8]>) ->
Result<ColumnarValue> {
- let value = match value {
- Some(value) => value,
- None => return
Ok(ColumnarValue::Scalar(ScalarValue::LargeBinary(None))),
- };
-
- let out = match self {
+ fn decode_bytes(self, value: &[u8]) -> Result<Vec<u8>> {
+ match self {
Self::Base64 => BASE64_ENGINE.decode(value).map_err(|e| {
- internal_datafusion_err!("Failed to decode value using base64:
{e}")
- })?,
+ exec_datafusion_err!("Failed to decode value using base64:
{e}")
+ }),
Self::Hex => hex::decode(value).map_err(|e| {
- internal_datafusion_err!("Failed to decode value using hex:
{e}")
- })?,
- };
-
- Ok(ColumnarValue::Scalar(ScalarValue::LargeBinary(Some(out))))
+ exec_datafusion_err!("Failed to decode value using hex: {e}")
+ }),
+ }
}
- fn decode_binary_array<T>(self, value: &dyn Array) -> Result<ColumnarValue>
+ // OutputOffset important to ensure Large types output Large arrays
+ fn encode_array<'a, InputBinaryArray, OutputOffset>(
+ self,
+ array: &InputBinaryArray,
+ ) -> Result<ArrayRef>
where
- T: OffsetSizeTrait,
+ InputBinaryArray: BinaryArrayType<'a>,
+ OutputOffset: OffsetSizeTrait,
{
- let input_value = as_generic_binary_array::<T>(value)?;
- let array = self.decode_byte_array(input_value)?;
- Ok(ColumnarValue::Array(array))
+ match self {
+ Self::Base64 => {
+ let array: GenericStringArray<OutputOffset> = array
+ .iter()
+ .map(|x| x.map(|x| BASE64_ENGINE.encode(x)))
+ .collect();
+ Ok(Arc::new(array))
+ }
+ Self::Hex => {
+ let array: GenericStringArray<OutputOffset> =
+ array.iter().map(|x| x.map(hex::encode)).collect();
+ Ok(Arc::new(array))
+ }
+ }
}
- fn decode_utf8_array<T>(self, value: &dyn Array) -> Result<ColumnarValue>
+ // OutputOffset important to ensure Large types output Large arrays
+ fn decode_array<'a, InputBinaryArray, OutputOffset>(
+ self,
+ value: &InputBinaryArray,
+ approx_data_size: usize,
+ ) -> Result<ArrayRef>
where
- T: OffsetSizeTrait,
+ InputBinaryArray: BinaryArrayType<'a>,
+ OutputOffset: OffsetSizeTrait,
{
- let input_value = as_generic_string_array::<T>(value)?;
- let array = self.decode_byte_array(input_value)?;
- Ok(ColumnarValue::Array(array))
- }
+ fn hex_decode(input: &[u8], buf: &mut [u8]) -> Result<usize> {
+ // only write input / 2 bytes to buf
+ let out_len = input.len() / 2;
+ let buf = &mut buf[..out_len];
+ hex::decode_to_slice(input, buf).map_err(|e| {
+ internal_datafusion_err!("Failed to decode from hex: {e}")
+ })?;
+ Ok(out_len)
+ }
+
+ fn base64_decode(input: &[u8], buf: &mut [u8]) -> Result<usize> {
+ BASE64_ENGINE.decode_slice(input, buf).map_err(|e| {
+ internal_datafusion_err!("Failed to decode from base64: {e}")
+ })
+ }
- fn decode_byte_array<T: ByteArrayType>(
- &self,
- input_value: &GenericByteArray<T>,
- ) -> Result<ArrayRef> {
match self {
Self::Base64 => {
- let upper_bound =
- base64::decoded_len_estimate(input_value.values().len());
- decode_to_array(base64_decode, input_value, upper_bound)
+ let upper_bound =
base64::decoded_len_estimate(approx_data_size);
+ delegated_decode::<_, _, OutputOffset>(base64_decode, value,
upper_bound)
}
Self::Hex => {
// Calculate the upper bound for decoded byte size
// For hex encoding, each pair of hex characters (2 bytes)
represents 1 byte when decoded
// So the upper bound is half the length of the input values.
- let upper_bound = input_value.values().len() / 2;
- decode_to_array(hex_decode, input_value, upper_bound)
+ let upper_bound = approx_data_size / 2;
+ delegated_decode::<_, _, OutputOffset>(hex_decode, value,
upper_bound)
}
}
}
}
-impl fmt::Display for Encoding {
- fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- write!(f, "{}", format!("{self:?}").to_lowercase())
+fn delegated_decode<'a, DecodeFunction, InputBinaryArray, OutputOffset>(
+ decode: DecodeFunction,
+ input: &InputBinaryArray,
+ conservative_upper_bound_size: usize,
+) -> Result<ArrayRef>
+where
+ DecodeFunction: Fn(&[u8], &mut [u8]) -> Result<usize>,
+ InputBinaryArray: BinaryArrayType<'a>,
+ OutputOffset: OffsetSizeTrait,
+{
+ let mut values = vec![0; conservative_upper_bound_size];
+ let mut offsets = OffsetBufferBuilder::new(input.len());
+ let mut total_bytes_decoded = 0;
+ for v in input.iter() {
+ if let Some(v) = v {
+ let cursor = &mut values[total_bytes_decoded..];
+ let decoded = decode(v, cursor)?;
+ total_bytes_decoded += decoded;
+ offsets.push_length(decoded);
+ } else {
+ offsets.push_length(0);
+ }
}
+ // We reserved an upper bound size for the values buffer, but we only use
the actual size
+ values.truncate(total_bytes_decoded);
+ let binary_array = GenericBinaryArray::<OutputOffset>::try_new(
Review Comment:
Previously we always returned BinaryArray which is what led to the issues
for large types; now we consider if the input was large and return accordingly
(views are considered as small type)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]