alamb commented on code in PR #18754:
URL: https://github.com/apache/datafusion/pull/18754#discussion_r2628651585
##########
datafusion/functions/src/encoding/inner.rs:
##########
@@ -232,324 +210,286 @@ impl ScalarUDFImpl for DecodeFunc {
}
}
-#[derive(Debug, Copy, Clone)]
-enum Encoding {
- Base64,
- Hex,
-}
-
-fn encode_process(value: &ColumnarValue, encoding: Encoding) ->
Result<ColumnarValue> {
+fn encode_scalar(value: &ScalarValue, encoding: Encoding) ->
Result<ColumnarValue> {
match value {
- ColumnarValue::Array(a) => match a.data_type() {
- DataType::Utf8 => encoding.encode_utf8_array::<i32>(a.as_ref()),
- DataType::LargeUtf8 =>
encoding.encode_utf8_array::<i64>(a.as_ref()),
- DataType::Utf8View =>
encoding.encode_utf8_array::<i32>(a.as_ref()),
- DataType::Binary =>
encoding.encode_binary_array::<i32>(a.as_ref()),
- DataType::LargeBinary =>
encoding.encode_binary_array::<i64>(a.as_ref()),
- other => exec_err!(
- "Unsupported data type {other:?} for function
encode({encoding})"
- ),
- },
- ColumnarValue::Scalar(scalar) => {
- match scalar {
- ScalarValue::Utf8(a) => {
- Ok(encoding.encode_scalar(a.as_ref().map(|s: &String|
s.as_bytes())))
- }
- ScalarValue::LargeUtf8(a) => Ok(encoding
- .encode_large_scalar(a.as_ref().map(|s: &String|
s.as_bytes()))),
- ScalarValue::Utf8View(a) => {
- Ok(encoding.encode_scalar(a.as_ref().map(|s: &String|
s.as_bytes())))
- }
- ScalarValue::Binary(a) => Ok(
- encoding.encode_scalar(a.as_ref().map(|v: &Vec<u8>|
v.as_slice()))
- ),
- ScalarValue::LargeBinary(a) => Ok(encoding
- .encode_large_scalar(a.as_ref().map(|v: &Vec<u8>|
v.as_slice()))),
- other => exec_err!(
- "Unsupported data type {other:?} for function
encode({encoding})"
- ),
- }
+ ScalarValue::Binary(maybe_bytes) |
ScalarValue::BinaryView(maybe_bytes) => {
+ Ok(ColumnarValue::Scalar(ScalarValue::Utf8(
+ maybe_bytes
+ .as_ref()
+ .map(|bytes| encoding.encode_bytes(bytes)),
+ )))
}
- }
-}
-
-fn decode_process(value: &ColumnarValue, encoding: Encoding) ->
Result<ColumnarValue> {
- match value {
- ColumnarValue::Array(a) => match a.data_type() {
- DataType::Utf8 => encoding.decode_utf8_array::<i32>(a.as_ref()),
- DataType::LargeUtf8 =>
encoding.decode_utf8_array::<i64>(a.as_ref()),
- DataType::Utf8View =>
encoding.decode_utf8_array::<i32>(a.as_ref()),
- DataType::Binary =>
encoding.decode_binary_array::<i32>(a.as_ref()),
- DataType::LargeBinary =>
encoding.decode_binary_array::<i64>(a.as_ref()),
- other => exec_err!(
- "Unsupported data type {other:?} for function
decode({encoding})"
- ),
- },
- ColumnarValue::Scalar(scalar) => {
- match scalar {
- ScalarValue::Utf8(a) => {
- encoding.decode_scalar(a.as_ref().map(|s: &String|
s.as_bytes()))
- }
- ScalarValue::LargeUtf8(a) => encoding
- .decode_large_scalar(a.as_ref().map(|s: &String|
s.as_bytes())),
- ScalarValue::Utf8View(a) => {
- encoding.decode_scalar(a.as_ref().map(|s: &String|
s.as_bytes()))
- }
- ScalarValue::Binary(a) => {
- encoding.decode_scalar(a.as_ref().map(|v: &Vec<u8>|
v.as_slice()))
- }
- ScalarValue::LargeBinary(a) => encoding
- .decode_large_scalar(a.as_ref().map(|v: &Vec<u8>|
v.as_slice())),
- other => exec_err!(
- "Unsupported data type {other:?} for function
decode({encoding})"
- ),
- }
+ ScalarValue::LargeBinary(maybe_bytes) => {
+ Ok(ColumnarValue::Scalar(ScalarValue::LargeUtf8(
+ maybe_bytes
+ .as_ref()
+ .map(|bytes| encoding.encode_bytes(bytes)),
+ )))
}
+ v => internal_err!("Unexpected value for encode: {v}"),
}
}
-fn hex_encode(input: &[u8]) -> String {
- hex::encode(input)
+fn encode_array(array: &ArrayRef, encoding: Encoding) -> Result<ColumnarValue>
{
+ let array = match array.data_type() {
+ DataType::Binary => encoding.encode_array::<_,
i32>(&array.as_binary::<i32>()),
+ DataType::BinaryView => encoding.encode_array::<_,
i32>(&array.as_binary_view()),
+ DataType::LargeBinary => {
+ encoding.encode_array::<_, i64>(&array.as_binary::<i64>())
+ }
+ dt => {
+ internal_err!("Unexpected data type for encode: {dt}")
+ }
+ };
+ array.map(ColumnarValue::Array)
}
-fn base64_encode(input: &[u8]) -> String {
- BASE64_ENGINE.encode(input)
+fn decode_scalar(value: &ScalarValue, encoding: Encoding) ->
Result<ColumnarValue> {
+ match value {
+ ScalarValue::Binary(maybe_bytes) |
ScalarValue::BinaryView(maybe_bytes) => {
+ Ok(ColumnarValue::Scalar(ScalarValue::Binary(
+ maybe_bytes
+ .as_ref()
+ .map(|x| encoding.decode_bytes(x))
+ .transpose()?,
+ )))
+ }
+ ScalarValue::LargeBinary(maybe_bytes) => {
+ Ok(ColumnarValue::Scalar(ScalarValue::LargeBinary(
+ maybe_bytes
+ .as_ref()
+ .map(|x| encoding.decode_bytes(x))
+ .transpose()?,
+ )))
+ }
+ v => internal_err!("Unexpected value for decode: {v}"),
+ }
}
-fn hex_decode(input: &[u8], buf: &mut [u8]) -> Result<usize> {
- // only write input / 2 bytes to buf
- let out_len = input.len() / 2;
- let buf = &mut buf[..out_len];
- hex::decode_to_slice(input, buf)
- .map_err(|e| internal_datafusion_err!("Failed to decode from hex:
{e}"))?;
- Ok(out_len)
+/// Estimate how many bytes are actually represented by the array; in case the
+/// the array slices it's internal buffer, this returns the byte size of that
slice
+/// but not the byte size of the entire buffer.
+///
+/// This is an estimation only as it can estimate higher if null slots are
non-zero
+/// sized.
+fn estimate_byte_data_size<O: OffsetSizeTrait>(array: &GenericBinaryArray<O>)
-> usize {
+ let offsets = array.value_offsets();
+ // Unwraps are safe as should always have 1 element in offset buffer
+ let start = *offsets.first().unwrap();
+ let end = *offsets.last().unwrap();
+ let data_size = end - start;
+ data_size.as_usize()
}
-fn base64_decode(input: &[u8], buf: &mut [u8]) -> Result<usize> {
- BASE64_ENGINE
- .decode_slice(input, buf)
- .map_err(|e| internal_datafusion_err!("Failed to decode from base64:
{e}"))
+fn decode_array(array: &ArrayRef, encoding: Encoding) -> Result<ColumnarValue>
{
+ let array = match array.data_type() {
+ DataType::Binary => {
+ let array = array.as_binary::<i32>();
+ encoding.decode_array::<_, i32>(&array,
estimate_byte_data_size(array))
+ }
+ DataType::BinaryView => {
+ let array = array.as_binary_view();
+ // Don't know if there is a more strict upper bound we can infer
+ // for view arrays byte data size.
+ encoding.decode_array::<_, i32>(&array,
array.get_buffer_memory_size())
Review Comment:
I don't know of any way to do so without actually summing the sizes of all
the views
##########
datafusion/functions/src/encoding/inner.rs:
##########
@@ -238,339 +209,387 @@ impl ScalarUDFImpl for DecodeFunc {
}
}
-#[derive(Debug, Copy, Clone)]
-enum Encoding {
- Base64,
- Hex,
-}
-
-fn encode_process(value: &ColumnarValue, encoding: Encoding) ->
Result<ColumnarValue> {
+fn encode_scalar(value: &ScalarValue, encoding: Encoding) ->
Result<ColumnarValue> {
match value {
- ColumnarValue::Array(a) => match a.data_type() {
- DataType::Utf8 => encoding.encode_utf8_array::<i32>(a.as_ref()),
- DataType::LargeUtf8 =>
encoding.encode_utf8_array::<i64>(a.as_ref()),
- DataType::Utf8View =>
encoding.encode_utf8_array::<i32>(a.as_ref()),
- DataType::Binary =>
encoding.encode_binary_array::<i32>(a.as_ref()),
- DataType::LargeBinary =>
encoding.encode_binary_array::<i64>(a.as_ref()),
- DataType::FixedSizeBinary(_) => {
- encoding.encode_fixed_size_binary_array(a.as_ref())
- }
- other => exec_err!(
- "Unsupported data type {other:?} for function
encode({encoding})"
- ),
- },
- ColumnarValue::Scalar(scalar) => {
- match scalar {
- ScalarValue::Utf8(a) => {
- Ok(encoding.encode_scalar(a.as_ref().map(|s: &String|
s.as_bytes())))
- }
- ScalarValue::LargeUtf8(a) => Ok(encoding
- .encode_large_scalar(a.as_ref().map(|s: &String|
s.as_bytes()))),
- ScalarValue::Utf8View(a) => {
- Ok(encoding.encode_scalar(a.as_ref().map(|s: &String|
s.as_bytes())))
- }
- ScalarValue::Binary(a) => Ok(
- encoding.encode_scalar(a.as_ref().map(|v: &Vec<u8>|
v.as_slice()))
- ),
- ScalarValue::LargeBinary(a) => Ok(encoding
- .encode_large_scalar(a.as_ref().map(|v: &Vec<u8>|
v.as_slice()))),
- ScalarValue::FixedSizeBinary(_, a) => Ok(
- encoding.encode_scalar(a.as_ref().map(|v: &Vec<u8>|
v.as_slice()))
- ),
- other => exec_err!(
- "Unsupported data type {other:?} for function
encode({encoding})"
- ),
- }
+ ScalarValue::Binary(maybe_bytes)
+ | ScalarValue::BinaryView(maybe_bytes)
+ | ScalarValue::FixedSizeBinary(_, maybe_bytes) => {
+ Ok(ColumnarValue::Scalar(ScalarValue::Utf8(
+ maybe_bytes
+ .as_ref()
+ .map(|bytes| encoding.encode_bytes(bytes)),
+ )))
}
+ ScalarValue::LargeBinary(maybe_bytes) => {
+ Ok(ColumnarValue::Scalar(ScalarValue::LargeUtf8(
+ maybe_bytes
+ .as_ref()
+ .map(|bytes| encoding.encode_bytes(bytes)),
+ )))
+ }
+ v => internal_err!("Unexpected value for encode: {v}"),
}
}
-fn decode_process(value: &ColumnarValue, encoding: Encoding) ->
Result<ColumnarValue> {
- match value {
- ColumnarValue::Array(a) => match a.data_type() {
- DataType::Utf8 => encoding.decode_utf8_array::<i32>(a.as_ref()),
- DataType::LargeUtf8 =>
encoding.decode_utf8_array::<i64>(a.as_ref()),
- DataType::Utf8View =>
encoding.decode_utf8_array::<i32>(a.as_ref()),
- DataType::Binary =>
encoding.decode_binary_array::<i32>(a.as_ref()),
- DataType::LargeBinary =>
encoding.decode_binary_array::<i64>(a.as_ref()),
- other => exec_err!(
- "Unsupported data type {other:?} for function
decode({encoding})"
- ),
- },
- ColumnarValue::Scalar(scalar) => {
- match scalar {
- ScalarValue::Utf8(a) => {
- encoding.decode_scalar(a.as_ref().map(|s: &String|
s.as_bytes()))
- }
- ScalarValue::LargeUtf8(a) => encoding
- .decode_large_scalar(a.as_ref().map(|s: &String|
s.as_bytes())),
- ScalarValue::Utf8View(a) => {
- encoding.decode_scalar(a.as_ref().map(|s: &String|
s.as_bytes()))
- }
- ScalarValue::Binary(a) => {
- encoding.decode_scalar(a.as_ref().map(|v: &Vec<u8>|
v.as_slice()))
- }
- ScalarValue::LargeBinary(a) => encoding
- .decode_large_scalar(a.as_ref().map(|v: &Vec<u8>|
v.as_slice())),
- other => exec_err!(
- "Unsupported data type {other:?} for function
decode({encoding})"
- ),
- }
+fn encode_array(array: &ArrayRef, encoding: Encoding) -> Result<ColumnarValue>
{
+ let array = match array.data_type() {
+ DataType::Binary => encoding.encode_array::<_,
i32>(&array.as_binary::<i32>()),
+ DataType::BinaryView => encoding.encode_array::<_,
i32>(&array.as_binary_view()),
+ DataType::LargeBinary => {
+ encoding.encode_array::<_, i64>(&array.as_binary::<i64>())
}
- }
+ DataType::FixedSizeBinary(_) => {
+ encoding.encode_fsb_array(array.as_fixed_size_binary())
+ }
+ dt => {
+ internal_err!("Unexpected data type for encode: {dt}")
+ }
+ };
+ array.map(ColumnarValue::Array)
}
-fn hex_encode(input: &[u8]) -> String {
- hex::encode(input)
+fn decode_scalar(value: &ScalarValue, encoding: Encoding) ->
Result<ColumnarValue> {
+ match value {
+ ScalarValue::Binary(maybe_bytes)
+ | ScalarValue::BinaryView(maybe_bytes)
+ | ScalarValue::FixedSizeBinary(_, maybe_bytes) => {
+ Ok(ColumnarValue::Scalar(ScalarValue::Binary(
+ maybe_bytes
+ .as_ref()
+ .map(|x| encoding.decode_bytes(x))
+ .transpose()?,
+ )))
+ }
+ ScalarValue::LargeBinary(maybe_bytes) => {
+ Ok(ColumnarValue::Scalar(ScalarValue::LargeBinary(
+ maybe_bytes
+ .as_ref()
+ .map(|x| encoding.decode_bytes(x))
+ .transpose()?,
+ )))
+ }
+ v => internal_err!("Unexpected value for decode: {v}"),
+ }
}
-fn base64_encode(input: &[u8]) -> String {
- BASE64_ENGINE.encode(input)
+/// Estimate how many bytes are actually represented by the array; in case the
+/// the array slices it's internal buffer, this returns the byte size of that
slice
+/// but not the byte size of the entire buffer.
+///
+/// This is an estimation only as it can estimate higher if null slots are
non-zero
+/// sized.
+fn estimate_byte_data_size<O: OffsetSizeTrait>(array: &GenericBinaryArray<O>)
-> usize {
+ let offsets = array.value_offsets();
+ // Unwraps are safe as should always have 1 element in offset buffer
+ let start = *offsets.first().unwrap();
+ let end = *offsets.last().unwrap();
+ let data_size = end - start;
+ data_size.as_usize()
}
-fn hex_decode(input: &[u8], buf: &mut [u8]) -> Result<usize> {
- // only write input / 2 bytes to buf
- let out_len = input.len() / 2;
- let buf = &mut buf[..out_len];
- hex::decode_to_slice(input, buf)
- .map_err(|e| internal_datafusion_err!("Failed to decode from hex:
{e}"))?;
- Ok(out_len)
+fn decode_array(array: &ArrayRef, encoding: Encoding) -> Result<ColumnarValue>
{
+ let array = match array.data_type() {
+ DataType::Binary => {
+ let array = array.as_binary::<i32>();
+ encoding.decode_array::<_, i32>(&array,
estimate_byte_data_size(array))
+ }
+ DataType::BinaryView => {
+ let array = array.as_binary_view();
+ // Don't know if there is a more strict upper bound we can infer
+ // for view arrays byte data size.
+ encoding.decode_array::<_, i32>(&array,
array.get_buffer_memory_size())
+ }
+ DataType::LargeBinary => {
+ let array = array.as_binary::<i64>();
+ encoding.decode_array::<_, i64>(&array,
estimate_byte_data_size(array))
+ }
+ DataType::FixedSizeBinary(size) => {
+ let array = array.as_fixed_size_binary();
+ // TODO: could we be more conservative by accounting for nulls?
+ let estimate = array.len().saturating_mul(*size as usize);
+ encoding.decode_fsb_array(array, estimate)
+ }
+ dt => {
+ internal_err!("Unexpected data type for decode: {dt}")
+ }
+ };
+ array.map(ColumnarValue::Array)
}
-fn base64_decode(input: &[u8], buf: &mut [u8]) -> Result<usize> {
- BASE64_ENGINE
- .decode_slice(input, buf)
- .map_err(|e| internal_datafusion_err!("Failed to decode from base64:
{e}"))
+#[derive(Debug, Copy, Clone)]
+enum Encoding {
+ Base64,
+ Hex,
}
-macro_rules! encode_to_array {
- ($METHOD: ident, $INPUT:expr) => {{
- let utf8_array: StringArray = $INPUT
- .iter()
- .map(|x| x.map(|x| $METHOD(x.as_ref())))
- .collect();
- Arc::new(utf8_array)
- }};
+impl fmt::Display for Encoding {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ write!(f, "{}", format!("{self:?}").to_lowercase())
+ }
}
-fn decode_to_array<F, T: ByteArrayType>(
- method: F,
- input: &GenericByteArray<T>,
- conservative_upper_bound_size: usize,
-) -> Result<ArrayRef>
-where
- F: Fn(&[u8], &mut [u8]) -> Result<usize>,
-{
- let mut values = vec![0; conservative_upper_bound_size];
- let mut offsets = OffsetBufferBuilder::new(input.len());
- let mut total_bytes_decoded = 0;
- for v in input {
- if let Some(v) = v {
- let cursor = &mut values[total_bytes_decoded..];
- let decoded = method(v.as_ref(), cursor)?;
- total_bytes_decoded += decoded;
- offsets.push_length(decoded);
- } else {
- offsets.push_length(0);
+impl TryFrom<&ColumnarValue> for Encoding {
+ type Error = DataFusionError;
+
+ fn try_from(encoding: &ColumnarValue) -> Result<Self> {
+ let encoding = match encoding {
+ ColumnarValue::Scalar(encoding) => match
encoding.try_as_str().flatten() {
+ Some(encoding) => encoding,
+ _ => return exec_err!("Encoding must be a non-null string"),
+ },
+ ColumnarValue::Array(_) => {
+ return not_impl_err!(
+ "Encoding must be a scalar; array specified encoding is
not yet supported"
+ );
+ }
+ };
+ match encoding {
+ "base64" => Ok(Self::Base64),
+ "hex" => Ok(Self::Hex),
+ _ => {
+ let options = [Self::Base64, Self::Hex]
+ .iter()
+ .map(|i| i.to_string())
+ .collect::<Vec<_>>()
+ .join(", ");
+ plan_err!(
+ "There is no built-in encoding named '{encoding}',
currently supported encodings are: {options}"
+ )
+ }
}
}
- // We reserved an upper bound size for the values buffer, but we only use
the actual size
- values.truncate(total_bytes_decoded);
- let binary_array = BinaryArray::try_new(
- offsets.finish(),
- Buffer::from_vec(values),
- input.nulls().cloned(),
- )?;
- Ok(Arc::new(binary_array))
}
impl Encoding {
- fn encode_scalar(self, value: Option<&[u8]>) -> ColumnarValue {
- ColumnarValue::Scalar(match self {
- Self::Base64 => ScalarValue::Utf8(value.map(|v|
BASE64_ENGINE.encode(v))),
- Self::Hex => ScalarValue::Utf8(value.map(hex::encode)),
- })
+ fn encode_bytes(self, value: &[u8]) -> String {
+ match self {
+ Self::Base64 => BASE64_ENGINE.encode(value),
+ Self::Hex => hex::encode(value),
+ }
}
- fn encode_large_scalar(self, value: Option<&[u8]>) -> ColumnarValue {
- ColumnarValue::Scalar(match self {
- Self::Base64 => {
- ScalarValue::LargeUtf8(value.map(|v| BASE64_ENGINE.encode(v)))
- }
- Self::Hex => ScalarValue::LargeUtf8(value.map(hex::encode)),
- })
+ fn decode_bytes(self, value: &[u8]) -> Result<Vec<u8>> {
+ match self {
+ Self::Base64 => BASE64_ENGINE.decode(value).map_err(|e| {
+ exec_datafusion_err!("Failed to decode value using base64:
{e}")
+ }),
+ Self::Hex => hex::decode(value).map_err(|e| {
+ exec_datafusion_err!("Failed to decode value using hex: {e}")
+ }),
+ }
}
- fn encode_binary_array<T>(self, value: &dyn Array) -> Result<ColumnarValue>
+ // OutputOffset important to ensure Large types output Large arrays
+ fn encode_array<'a, InputBinaryArray, OutputOffset>(
+ self,
+ array: &InputBinaryArray,
+ ) -> Result<ArrayRef>
where
- T: OffsetSizeTrait,
+ InputBinaryArray: BinaryArrayType<'a>,
+ OutputOffset: OffsetSizeTrait,
{
- let input_value = as_generic_binary_array::<T>(value)?;
- let array: ArrayRef = match self {
- Self::Base64 => encode_to_array!(base64_encode, input_value),
- Self::Hex => encode_to_array!(hex_encode, input_value),
- };
- Ok(ColumnarValue::Array(array))
+ match self {
+ Self::Base64 => {
+ let array: GenericStringArray<OutputOffset> = array
+ .iter()
+ .map(|x| x.map(|x| BASE64_ENGINE.encode(x)))
+ .collect();
+ Ok(Arc::new(array))
+ }
+ Self::Hex => {
+ let array: GenericStringArray<OutputOffset> =
+ array.iter().map(|x| x.map(hex::encode)).collect();
+ Ok(Arc::new(array))
+ }
+ }
}
- fn encode_fixed_size_binary_array(self, value: &dyn Array) ->
Result<ColumnarValue> {
- let input_value = as_fixed_size_binary_array(value)?;
- let array: ArrayRef = match self {
- Self::Base64 => encode_to_array!(base64_encode, input_value),
- Self::Hex => encode_to_array!(hex_encode, input_value),
- };
- Ok(ColumnarValue::Array(array))
+ // TODO: refactor this away once
https://github.com/apache/arrow-rs/pull/8993 lands
+ fn encode_fsb_array(self, array: &FixedSizeBinaryArray) ->
Result<ArrayRef> {
+ match self {
+ Self::Base64 => {
+ let array: GenericStringArray<i32> = array
+ .iter()
+ .map(|x| x.map(|x| BASE64_ENGINE.encode(x)))
+ .collect();
+ Ok(Arc::new(array))
+ }
+ Self::Hex => {
+ let array: GenericStringArray<i32> =
+ array.iter().map(|x| x.map(hex::encode)).collect();
+ Ok(Arc::new(array))
+ }
+ }
}
- fn encode_utf8_array<T>(self, value: &dyn Array) -> Result<ColumnarValue>
+ // OutputOffset important to ensure Large types output Large arrays
+ fn decode_array<'a, InputBinaryArray, OutputOffset>(
+ self,
+ value: &InputBinaryArray,
+ approx_data_size: usize,
+ ) -> Result<ArrayRef>
where
- T: OffsetSizeTrait,
+ InputBinaryArray: BinaryArrayType<'a>,
+ OutputOffset: OffsetSizeTrait,
{
- let input_value = as_generic_string_array::<T>(value)?;
- let array: ArrayRef = match self {
- Self::Base64 => encode_to_array!(base64_encode, input_value),
- Self::Hex => encode_to_array!(hex_encode, input_value),
- };
- Ok(ColumnarValue::Array(array))
- }
-
- fn decode_scalar(self, value: Option<&[u8]>) -> Result<ColumnarValue> {
- let value = match value {
- Some(value) => value,
- None => return
Ok(ColumnarValue::Scalar(ScalarValue::Binary(None))),
- };
+ fn hex_decode(input: &[u8], buf: &mut [u8]) -> Result<usize> {
+ // only write input / 2 bytes to buf
+ let out_len = input.len() / 2;
+ let buf = &mut buf[..out_len];
+ hex::decode_to_slice(input, buf).map_err(|e| {
+ internal_datafusion_err!("Failed to decode from hex: {e}")
Review Comment:
Not introduced by this PR, but I think this should be a normal execution
error (it happens on bad input data) rather than an internal error. Same for
the other errors below
##########
datafusion/functions/src/encoding/inner.rs:
##########
@@ -238,339 +209,387 @@ impl ScalarUDFImpl for DecodeFunc {
}
}
-#[derive(Debug, Copy, Clone)]
-enum Encoding {
- Base64,
- Hex,
-}
-
-fn encode_process(value: &ColumnarValue, encoding: Encoding) ->
Result<ColumnarValue> {
+fn encode_scalar(value: &ScalarValue, encoding: Encoding) ->
Result<ColumnarValue> {
match value {
- ColumnarValue::Array(a) => match a.data_type() {
- DataType::Utf8 => encoding.encode_utf8_array::<i32>(a.as_ref()),
- DataType::LargeUtf8 =>
encoding.encode_utf8_array::<i64>(a.as_ref()),
- DataType::Utf8View =>
encoding.encode_utf8_array::<i32>(a.as_ref()),
- DataType::Binary =>
encoding.encode_binary_array::<i32>(a.as_ref()),
- DataType::LargeBinary =>
encoding.encode_binary_array::<i64>(a.as_ref()),
- DataType::FixedSizeBinary(_) => {
- encoding.encode_fixed_size_binary_array(a.as_ref())
- }
- other => exec_err!(
- "Unsupported data type {other:?} for function
encode({encoding})"
- ),
- },
- ColumnarValue::Scalar(scalar) => {
- match scalar {
- ScalarValue::Utf8(a) => {
- Ok(encoding.encode_scalar(a.as_ref().map(|s: &String|
s.as_bytes())))
- }
- ScalarValue::LargeUtf8(a) => Ok(encoding
- .encode_large_scalar(a.as_ref().map(|s: &String|
s.as_bytes()))),
- ScalarValue::Utf8View(a) => {
- Ok(encoding.encode_scalar(a.as_ref().map(|s: &String|
s.as_bytes())))
- }
- ScalarValue::Binary(a) => Ok(
- encoding.encode_scalar(a.as_ref().map(|v: &Vec<u8>|
v.as_slice()))
- ),
- ScalarValue::LargeBinary(a) => Ok(encoding
- .encode_large_scalar(a.as_ref().map(|v: &Vec<u8>|
v.as_slice()))),
- ScalarValue::FixedSizeBinary(_, a) => Ok(
- encoding.encode_scalar(a.as_ref().map(|v: &Vec<u8>|
v.as_slice()))
- ),
- other => exec_err!(
- "Unsupported data type {other:?} for function
encode({encoding})"
- ),
- }
+ ScalarValue::Binary(maybe_bytes)
+ | ScalarValue::BinaryView(maybe_bytes)
+ | ScalarValue::FixedSizeBinary(_, maybe_bytes) => {
+ Ok(ColumnarValue::Scalar(ScalarValue::Utf8(
+ maybe_bytes
+ .as_ref()
+ .map(|bytes| encoding.encode_bytes(bytes)),
+ )))
}
+ ScalarValue::LargeBinary(maybe_bytes) => {
+ Ok(ColumnarValue::Scalar(ScalarValue::LargeUtf8(
+ maybe_bytes
+ .as_ref()
+ .map(|bytes| encoding.encode_bytes(bytes)),
+ )))
+ }
+ v => internal_err!("Unexpected value for encode: {v}"),
}
}
-fn decode_process(value: &ColumnarValue, encoding: Encoding) ->
Result<ColumnarValue> {
- match value {
- ColumnarValue::Array(a) => match a.data_type() {
- DataType::Utf8 => encoding.decode_utf8_array::<i32>(a.as_ref()),
- DataType::LargeUtf8 =>
encoding.decode_utf8_array::<i64>(a.as_ref()),
- DataType::Utf8View =>
encoding.decode_utf8_array::<i32>(a.as_ref()),
- DataType::Binary =>
encoding.decode_binary_array::<i32>(a.as_ref()),
- DataType::LargeBinary =>
encoding.decode_binary_array::<i64>(a.as_ref()),
- other => exec_err!(
- "Unsupported data type {other:?} for function
decode({encoding})"
- ),
- },
- ColumnarValue::Scalar(scalar) => {
- match scalar {
- ScalarValue::Utf8(a) => {
- encoding.decode_scalar(a.as_ref().map(|s: &String|
s.as_bytes()))
- }
- ScalarValue::LargeUtf8(a) => encoding
- .decode_large_scalar(a.as_ref().map(|s: &String|
s.as_bytes())),
- ScalarValue::Utf8View(a) => {
- encoding.decode_scalar(a.as_ref().map(|s: &String|
s.as_bytes()))
- }
- ScalarValue::Binary(a) => {
- encoding.decode_scalar(a.as_ref().map(|v: &Vec<u8>|
v.as_slice()))
- }
- ScalarValue::LargeBinary(a) => encoding
- .decode_large_scalar(a.as_ref().map(|v: &Vec<u8>|
v.as_slice())),
- other => exec_err!(
- "Unsupported data type {other:?} for function
decode({encoding})"
- ),
- }
+fn encode_array(array: &ArrayRef, encoding: Encoding) -> Result<ColumnarValue>
{
+ let array = match array.data_type() {
+ DataType::Binary => encoding.encode_array::<_,
i32>(&array.as_binary::<i32>()),
+ DataType::BinaryView => encoding.encode_array::<_,
i32>(&array.as_binary_view()),
+ DataType::LargeBinary => {
+ encoding.encode_array::<_, i64>(&array.as_binary::<i64>())
}
- }
+ DataType::FixedSizeBinary(_) => {
+ encoding.encode_fsb_array(array.as_fixed_size_binary())
+ }
+ dt => {
+ internal_err!("Unexpected data type for encode: {dt}")
+ }
+ };
+ array.map(ColumnarValue::Array)
}
-fn hex_encode(input: &[u8]) -> String {
- hex::encode(input)
+fn decode_scalar(value: &ScalarValue, encoding: Encoding) ->
Result<ColumnarValue> {
+ match value {
+ ScalarValue::Binary(maybe_bytes)
+ | ScalarValue::BinaryView(maybe_bytes)
+ | ScalarValue::FixedSizeBinary(_, maybe_bytes) => {
+ Ok(ColumnarValue::Scalar(ScalarValue::Binary(
+ maybe_bytes
+ .as_ref()
+ .map(|x| encoding.decode_bytes(x))
+ .transpose()?,
+ )))
+ }
+ ScalarValue::LargeBinary(maybe_bytes) => {
+ Ok(ColumnarValue::Scalar(ScalarValue::LargeBinary(
+ maybe_bytes
+ .as_ref()
+ .map(|x| encoding.decode_bytes(x))
+ .transpose()?,
+ )))
+ }
+ v => internal_err!("Unexpected value for decode: {v}"),
+ }
}
-fn base64_encode(input: &[u8]) -> String {
- BASE64_ENGINE.encode(input)
+/// Estimate how many bytes are actually represented by the array; in case the
+/// the array slices it's internal buffer, this returns the byte size of that
slice
+/// but not the byte size of the entire buffer.
+///
+/// This is an estimation only as it can estimate higher if null slots are
non-zero
+/// sized.
+fn estimate_byte_data_size<O: OffsetSizeTrait>(array: &GenericBinaryArray<O>)
-> usize {
+ let offsets = array.value_offsets();
+ // Unwraps are safe as should always have 1 element in offset buffer
+ let start = *offsets.first().unwrap();
+ let end = *offsets.last().unwrap();
+ let data_size = end - start;
+ data_size.as_usize()
}
-fn hex_decode(input: &[u8], buf: &mut [u8]) -> Result<usize> {
- // only write input / 2 bytes to buf
- let out_len = input.len() / 2;
- let buf = &mut buf[..out_len];
- hex::decode_to_slice(input, buf)
- .map_err(|e| internal_datafusion_err!("Failed to decode from hex:
{e}"))?;
- Ok(out_len)
+fn decode_array(array: &ArrayRef, encoding: Encoding) -> Result<ColumnarValue>
{
+ let array = match array.data_type() {
+ DataType::Binary => {
+ let array = array.as_binary::<i32>();
+ encoding.decode_array::<_, i32>(&array,
estimate_byte_data_size(array))
+ }
+ DataType::BinaryView => {
+ let array = array.as_binary_view();
+ // Don't know if there is a more strict upper bound we can infer
+ // for view arrays byte data size.
+ encoding.decode_array::<_, i32>(&array,
array.get_buffer_memory_size())
+ }
+ DataType::LargeBinary => {
+ let array = array.as_binary::<i64>();
+ encoding.decode_array::<_, i64>(&array,
estimate_byte_data_size(array))
+ }
+ DataType::FixedSizeBinary(size) => {
+ let array = array.as_fixed_size_binary();
+ // TODO: could we be more conservative by accounting for nulls?
+ let estimate = array.len().saturating_mul(*size as usize);
+ encoding.decode_fsb_array(array, estimate)
+ }
+ dt => {
+ internal_err!("Unexpected data type for decode: {dt}")
+ }
+ };
+ array.map(ColumnarValue::Array)
}
-fn base64_decode(input: &[u8], buf: &mut [u8]) -> Result<usize> {
- BASE64_ENGINE
- .decode_slice(input, buf)
- .map_err(|e| internal_datafusion_err!("Failed to decode from base64:
{e}"))
+#[derive(Debug, Copy, Clone)]
+enum Encoding {
+ Base64,
+ Hex,
}
-macro_rules! encode_to_array {
- ($METHOD: ident, $INPUT:expr) => {{
- let utf8_array: StringArray = $INPUT
- .iter()
- .map(|x| x.map(|x| $METHOD(x.as_ref())))
- .collect();
- Arc::new(utf8_array)
- }};
+impl fmt::Display for Encoding {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ write!(f, "{}", format!("{self:?}").to_lowercase())
+ }
}
-fn decode_to_array<F, T: ByteArrayType>(
- method: F,
- input: &GenericByteArray<T>,
- conservative_upper_bound_size: usize,
-) -> Result<ArrayRef>
-where
- F: Fn(&[u8], &mut [u8]) -> Result<usize>,
-{
- let mut values = vec![0; conservative_upper_bound_size];
- let mut offsets = OffsetBufferBuilder::new(input.len());
- let mut total_bytes_decoded = 0;
- for v in input {
- if let Some(v) = v {
- let cursor = &mut values[total_bytes_decoded..];
- let decoded = method(v.as_ref(), cursor)?;
- total_bytes_decoded += decoded;
- offsets.push_length(decoded);
- } else {
- offsets.push_length(0);
+impl TryFrom<&ColumnarValue> for Encoding {
+ type Error = DataFusionError;
+
+ fn try_from(encoding: &ColumnarValue) -> Result<Self> {
+ let encoding = match encoding {
+ ColumnarValue::Scalar(encoding) => match
encoding.try_as_str().flatten() {
+ Some(encoding) => encoding,
+ _ => return exec_err!("Encoding must be a non-null string"),
+ },
+ ColumnarValue::Array(_) => {
+ return not_impl_err!(
+ "Encoding must be a scalar; array specified encoding is
not yet supported"
+ );
+ }
+ };
+ match encoding {
+ "base64" => Ok(Self::Base64),
+ "hex" => Ok(Self::Hex),
+ _ => {
+ let options = [Self::Base64, Self::Hex]
+ .iter()
+ .map(|i| i.to_string())
+ .collect::<Vec<_>>()
+ .join(", ");
+ plan_err!(
+ "There is no built-in encoding named '{encoding}',
currently supported encodings are: {options}"
+ )
+ }
}
}
- // We reserved an upper bound size for the values buffer, but we only use
the actual size
- values.truncate(total_bytes_decoded);
- let binary_array = BinaryArray::try_new(
- offsets.finish(),
- Buffer::from_vec(values),
- input.nulls().cloned(),
- )?;
- Ok(Arc::new(binary_array))
}
impl Encoding {
- fn encode_scalar(self, value: Option<&[u8]>) -> ColumnarValue {
- ColumnarValue::Scalar(match self {
- Self::Base64 => ScalarValue::Utf8(value.map(|v|
BASE64_ENGINE.encode(v))),
- Self::Hex => ScalarValue::Utf8(value.map(hex::encode)),
- })
+ fn encode_bytes(self, value: &[u8]) -> String {
+ match self {
+ Self::Base64 => BASE64_ENGINE.encode(value),
+ Self::Hex => hex::encode(value),
+ }
}
- fn encode_large_scalar(self, value: Option<&[u8]>) -> ColumnarValue {
- ColumnarValue::Scalar(match self {
- Self::Base64 => {
- ScalarValue::LargeUtf8(value.map(|v| BASE64_ENGINE.encode(v)))
- }
- Self::Hex => ScalarValue::LargeUtf8(value.map(hex::encode)),
- })
+ fn decode_bytes(self, value: &[u8]) -> Result<Vec<u8>> {
+ match self {
+ Self::Base64 => BASE64_ENGINE.decode(value).map_err(|e| {
+ exec_datafusion_err!("Failed to decode value using base64:
{e}")
+ }),
+ Self::Hex => hex::decode(value).map_err(|e| {
+ exec_datafusion_err!("Failed to decode value using hex: {e}")
+ }),
+ }
}
- fn encode_binary_array<T>(self, value: &dyn Array) -> Result<ColumnarValue>
+ // OutputOffset important to ensure Large types output Large arrays
+ fn encode_array<'a, InputBinaryArray, OutputOffset>(
+ self,
+ array: &InputBinaryArray,
+ ) -> Result<ArrayRef>
where
- T: OffsetSizeTrait,
+ InputBinaryArray: BinaryArrayType<'a>,
+ OutputOffset: OffsetSizeTrait,
{
- let input_value = as_generic_binary_array::<T>(value)?;
- let array: ArrayRef = match self {
- Self::Base64 => encode_to_array!(base64_encode, input_value),
- Self::Hex => encode_to_array!(hex_encode, input_value),
- };
- Ok(ColumnarValue::Array(array))
+ match self {
+ Self::Base64 => {
+ let array: GenericStringArray<OutputOffset> = array
+ .iter()
+ .map(|x| x.map(|x| BASE64_ENGINE.encode(x)))
+ .collect();
+ Ok(Arc::new(array))
+ }
+ Self::Hex => {
+ let array: GenericStringArray<OutputOffset> =
+ array.iter().map(|x| x.map(hex::encode)).collect();
+ Ok(Arc::new(array))
+ }
+ }
}
- fn encode_fixed_size_binary_array(self, value: &dyn Array) ->
Result<ColumnarValue> {
- let input_value = as_fixed_size_binary_array(value)?;
- let array: ArrayRef = match self {
- Self::Base64 => encode_to_array!(base64_encode, input_value),
- Self::Hex => encode_to_array!(hex_encode, input_value),
- };
- Ok(ColumnarValue::Array(array))
+ // TODO: refactor this away once
https://github.com/apache/arrow-rs/pull/8993 lands
+ fn encode_fsb_array(self, array: &FixedSizeBinaryArray) ->
Result<ArrayRef> {
+ match self {
+ Self::Base64 => {
+ let array: GenericStringArray<i32> = array
+ .iter()
+ .map(|x| x.map(|x| BASE64_ENGINE.encode(x)))
+ .collect();
+ Ok(Arc::new(array))
+ }
+ Self::Hex => {
+ let array: GenericStringArray<i32> =
+ array.iter().map(|x| x.map(hex::encode)).collect();
+ Ok(Arc::new(array))
+ }
+ }
}
- fn encode_utf8_array<T>(self, value: &dyn Array) -> Result<ColumnarValue>
+ // OutputOffset important to ensure Large types output Large arrays
+ fn decode_array<'a, InputBinaryArray, OutputOffset>(
+ self,
+ value: &InputBinaryArray,
+ approx_data_size: usize,
+ ) -> Result<ArrayRef>
where
- T: OffsetSizeTrait,
+ InputBinaryArray: BinaryArrayType<'a>,
+ OutputOffset: OffsetSizeTrait,
{
- let input_value = as_generic_string_array::<T>(value)?;
- let array: ArrayRef = match self {
- Self::Base64 => encode_to_array!(base64_encode, input_value),
- Self::Hex => encode_to_array!(hex_encode, input_value),
- };
- Ok(ColumnarValue::Array(array))
- }
-
- fn decode_scalar(self, value: Option<&[u8]>) -> Result<ColumnarValue> {
- let value = match value {
- Some(value) => value,
- None => return
Ok(ColumnarValue::Scalar(ScalarValue::Binary(None))),
- };
+ fn hex_decode(input: &[u8], buf: &mut [u8]) -> Result<usize> {
+ // only write input / 2 bytes to buf
+ let out_len = input.len() / 2;
+ let buf = &mut buf[..out_len];
+ hex::decode_to_slice(input, buf).map_err(|e| {
+ internal_datafusion_err!("Failed to decode from hex: {e}")
+ })?;
+ Ok(out_len)
+ }
- let out = match self {
- Self::Base64 => BASE64_ENGINE.decode(value).map_err(|e| {
- internal_datafusion_err!("Failed to decode value using base64:
{e}")
- })?,
- Self::Hex => hex::decode(value).map_err(|e| {
- internal_datafusion_err!("Failed to decode value using hex:
{e}")
- })?,
- };
+ fn base64_decode(input: &[u8], buf: &mut [u8]) -> Result<usize> {
+ BASE64_ENGINE.decode_slice(input, buf).map_err(|e| {
+ internal_datafusion_err!("Failed to decode from base64: {e}")
+ })
+ }
- Ok(ColumnarValue::Scalar(ScalarValue::Binary(Some(out))))
+ match self {
+ Self::Base64 => {
+ let upper_bound =
base64::decoded_len_estimate(approx_data_size);
+ delegated_decode::<_, _, OutputOffset>(base64_decode, value,
upper_bound)
+ }
+ Self::Hex => {
+ // Calculate the upper bound for decoded byte size
+ // For hex encoding, each pair of hex characters (2 bytes)
represents 1 byte when decoded
+ // So the upper bound is half the length of the input values.
+ let upper_bound = approx_data_size / 2;
+ delegated_decode::<_, _, OutputOffset>(hex_decode, value,
upper_bound)
+ }
+ }
}
- fn decode_large_scalar(self, value: Option<&[u8]>) ->
Result<ColumnarValue> {
- let value = match value {
- Some(value) => value,
- None => return
Ok(ColumnarValue::Scalar(ScalarValue::LargeBinary(None))),
- };
-
- let out = match self {
- Self::Base64 => BASE64_ENGINE.decode(value).map_err(|e| {
- internal_datafusion_err!("Failed to decode value using base64:
{e}")
- })?,
- Self::Hex => hex::decode(value).map_err(|e| {
- internal_datafusion_err!("Failed to decode value using hex:
{e}")
- })?,
- };
-
- Ok(ColumnarValue::Scalar(ScalarValue::LargeBinary(Some(out))))
- }
+ // TODO: refactor this away once
https://github.com/apache/arrow-rs/pull/8993 lands
Review Comment:
👍
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]